From 7388049ad2c2f40625ce1ac688b029897a9ff8f1 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Mon, 28 Oct 2024 14:41:52 +0100 Subject: [PATCH] Batching v2 (#765) * feat: switch id empty/check to static inline * feat: use batch count instead of batch vec * refactor: batch don't copy messages * feat: rename batch api * feat: group common transport fields * feat: use common struct in tx functions * feat: rename multicast tx functions * fix: unreachable * refactor: fuse unicast and multicast tx --- include/zenoh-pico/api/primitives.h | 6 +- include/zenoh-pico/protocol/core.h | 5 +- include/zenoh-pico/session/utils.h | 3 - include/zenoh-pico/transport/common/tx.h | 4 + .../transport/multicast/transport.h | 21 -- include/zenoh-pico/transport/multicast/tx.h | 26 -- include/zenoh-pico/transport/raweth/tx.h | 2 +- include/zenoh-pico/transport/transport.h | 141 ++++----- .../zenoh-pico/transport/unicast/transport.h | 25 +- include/zenoh-pico/transport/unicast/tx.h | 26 -- src/api/api.c | 6 +- src/net/primitives.c | 1 + src/net/query.c | 1 + src/protocol/core.c | 29 +- src/session/interest.c | 1 + src/session/rx.c | 1 + src/session/tx.c | 61 ---- src/session/utils.c | 6 +- src/transport/common/tx.c | 281 ++++++++++++++++- src/transport/multicast.c | 1 - src/transport/multicast/lease.c | 35 +-- src/transport/multicast/read.c | 51 ++-- src/transport/multicast/rx.c | 49 +-- src/transport/multicast/transport.c | 80 +++-- src/transport/multicast/tx.c | 287 ----------------- src/transport/raweth/read.c | 16 +- src/transport/raweth/rx.c | 26 +- src/transport/raweth/tx.c | 80 ++--- src/transport/transport.c | 19 +- src/transport/unicast.c | 1 - src/transport/unicast/lease.c | 32 +- src/transport/unicast/read.c | 53 ++-- src/transport/unicast/rx.c | 53 ++-- src/transport/unicast/transport.c | 76 +++-- src/transport/unicast/tx.c | 289 ------------------ 35 files changed, 645 insertions(+), 1149 deletions(-) delete mode 100644 include/zenoh-pico/transport/multicast/tx.h delete mode 100644 include/zenoh-pico/transport/unicast/tx.h delete mode 100644 src/session/tx.c delete mode 100644 src/transport/multicast/tx.c delete mode 100644 src/transport/unicast/tx.c diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 07c1e4e35..a944149c6 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -2060,7 +2060,7 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs /** * Activate the batching mechanism. * Any message that would have been sent on the network by a subsequent api call (e.g z_put, z_get) - * will be instead stored until the batch is flushed with :c:func:`zp_batch_flush`. + * will be instead stored until the batch is full or batching is stopped with :c:func:`zp_batch_stop`. * * Parameters: * zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages. @@ -2071,7 +2071,7 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs z_result_t zp_batch_start(const z_loaned_session_t *zs); /** - * Deactivate the batching mechanism and flush the messages that were stored. + * Deactivate the batching mechanism and flush the remaining messages. * * Parameters: * zs: Pointer to a :c:type:`z_loaned_session_t` that will stop batching messages. @@ -2079,7 +2079,7 @@ z_result_t zp_batch_start(const z_loaned_session_t *zs); * Return: * ``0`` if batching stopped and batch successfully sent, ``negative value`` otherwise. */ -z_result_t zp_batch_flush(const z_loaned_session_t *zs); +z_result_t zp_batch_stop(const z_loaned_session_t *zs); #endif /************* Multi Thread Tasks helpers **************/ diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 34970275f..366659e39 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -54,9 +54,10 @@ typedef size_t _z_zint_t; typedef struct { uint8_t id[16]; } _z_id_t; +extern const _z_id_t empty_id; uint8_t _z_id_len(_z_id_t id); -bool _z_id_check(_z_id_t id); -_z_id_t _z_id_empty(void); +static inline bool _z_id_check(_z_id_t id) { return memcmp(&id, &empty_id, sizeof(id)) != 0; } +static inline _z_id_t _z_id_empty(void) { return (_z_id_t){0}; } /** * A zenoh timestamp. diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 2e3a47987..6d8c381b2 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -31,9 +31,6 @@ void _z_session_clear(_z_session_t *zn); z_result_t _z_session_close(_z_session_t *zn, uint8_t reason); z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *z_msg, uint16_t local_peer_id); -z_result_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); -z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl); #if Z_FEATURE_MULTI_THREAD == 1 static inline void _z_session_mutex_lock(_z_session_t *zn) { (void)_z_mutex_lock(&zn->_mutex_inner); } diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 1eb4abac1..5a7b78b4d 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -26,7 +26,11 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); /*------------------ Transmission and Reception helpers ------------------*/ +z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg); z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg); +z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl); +z_result_t _z_send_n_batch(_z_session_t *zn, z_congestion_control_t cong_ctrl); #endif /* ZENOH_PICO_TRANSPORT_TX_H */ diff --git a/include/zenoh-pico/transport/multicast/transport.h b/include/zenoh-pico/transport/multicast/transport.h index 802393e53..4947ac894 100644 --- a/include/zenoh-pico/transport/multicast/transport.h +++ b/include/zenoh-pico/transport/multicast/transport.h @@ -28,31 +28,10 @@ z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t r void _z_multicast_transport_clear(_z_transport_t *zt); #if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1 -static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) { - if (block) { - _z_mutex_lock(&ztm->_mutex_tx); - return _Z_RES_OK; - } else { - return _z_mutex_try_lock(&ztm->_mutex_tx); - } -} -static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); } -static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_rx); } -static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_rx); } static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); } static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_peer); } - #else -static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(block); - return _Z_RES_OK; -} -static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } -static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } -static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } - #endif // (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1 #endif /* ZENOH_PICO_MULTICAST_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/multicast/tx.h b/include/zenoh-pico/transport/multicast/tx.h deleted file mode 100644 index 836b441b6..000000000 --- a/include/zenoh-pico/transport/multicast/tx.h +++ /dev/null @@ -1,26 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_MULTICAST_TX_H -#define ZENOH_PICO_MULTICAST_TX_H - -#include "zenoh-pico/net/session.h" -#include "zenoh-pico/transport/transport.h" - -z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); -z_result_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); -z_result_t _z_multicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl); - -#endif /* ZENOH_PICO_MULTICAST_TX_H */ diff --git a/include/zenoh-pico/transport/raweth/tx.h b/include/zenoh-pico/transport/raweth/tx.h index 83abdd9fb..9b18d2004 100644 --- a/include/zenoh-pico/transport/raweth/tx.h +++ b/include/zenoh-pico/transport/raweth/tx.h @@ -21,6 +21,6 @@ z_result_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg); z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); -z_result_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); +z_result_t _z_raweth_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg); #endif /* ZENOH_PICO_RAWETH_TX_H */ diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 1efc9ccd0..e6c97e5d3 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -37,25 +37,23 @@ enum _z_batching_state_e { }; typedef struct { -#if Z_FEATURE_FRAGMENTATION == 1 - // Defragmentation buffers - uint8_t _state_reliable; - uint8_t _state_best_effort; - _z_wbuf_t _dbuf_reliable; - _z_wbuf_t _dbuf_best_effort; -#endif - _z_id_t _remote_zid; _z_slice_t _remote_addr; _z_conduit_sn_list_t _sn_rx_sns; - // SN numbers _z_zint_t _sn_res; volatile _z_zint_t _lease; volatile _z_zint_t _next_lease; - uint16_t _peer_id; volatile bool _received; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Defragmentation buffers + uint8_t _state_reliable; + uint8_t _state_best_effort; + _z_wbuf_t _dbuf_reliable; + _z_wbuf_t _dbuf_best_effort; +#endif } _z_transport_peer_entry_t; size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src); @@ -71,107 +69,64 @@ _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport // Forward type declaration to avoid cyclical include typedef struct _z_session_rc_t _z_session_rc_ref_t; -// Forward declaration to be used in _zp_f_send_tmsg* -typedef struct _z_transport_multicast_t _z_transport_multicast_t; -// Send function prototype -typedef z_result_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_transport_message_t *t_msg); - typedef struct { - // Session associated to the transport _z_session_rc_ref_t *_session; - -#if Z_FEATURE_MULTI_THREAD == 1 - // TX and RX mutexes - _z_mutex_t _mutex_rx; - _z_mutex_t _mutex_tx; -#endif // Z_FEATURE_MULTI_THREAD == 1 - _z_link_t _link; - -#if Z_FEATURE_FRAGMENTATION == 1 - // Defragmentation buffer - uint8_t _state_reliable; - uint8_t _state_best_effort; - _z_wbuf_t _dbuf_reliable; - _z_wbuf_t _dbuf_best_effort; -#endif - - // Regular Buffers + // TX and RX buffers _z_wbuf_t _wbuf; _z_zbuf_t _zbuf; - - _z_id_t _remote_zid; - // SN numbers _z_zint_t _sn_res; _z_zint_t _sn_tx_reliable; _z_zint_t _sn_tx_best_effort; - _z_zint_t _sn_rx_reliable; - _z_zint_t _sn_rx_best_effort; volatile _z_zint_t _lease; - -// Transport batching -#if Z_FEATURE_BATCHING == 1 - uint8_t _batch_state; - _z_network_message_vec_t _batch; -#endif - -#if Z_FEATURE_MULTI_THREAD == 1 - _z_task_t *_read_task; - _z_task_t *_lease_task; - volatile bool _read_task_running; - volatile bool _lease_task_running; -#endif // Z_FEATURE_MULTI_THREAD == 1 - - volatile bool _received; volatile bool _transmitted; -} _z_transport_unicast_t; - -typedef struct _z_transport_multicast_t { - // Session associated to the transport - _z_session_rc_ref_t *_session; - #if Z_FEATURE_MULTI_THREAD == 1 // TX and RX mutexes _z_mutex_t _mutex_rx; _z_mutex_t _mutex_tx; - // Peer list mutex - _z_mutex_t _mutex_peer; -#endif // Z_FEATURE_MULTI_THREAD == 1 - + _z_task_t *_read_task; + _z_task_t *_lease_task; + volatile bool _read_task_running; + volatile bool _lease_task_running; +#endif // Transport batching #if Z_FEATURE_BATCHING == 1 uint8_t _batch_state; - _z_network_message_vec_t _batch; + size_t _batch_count; #endif +} _z_transport_common_t; - _z_link_t _link; +// Send function prototype +typedef z_result_t (*_zp_f_send_tmsg)(_z_transport_common_t *self, const _z_transport_message_t *t_msg); - // TX and RX buffers - _z_wbuf_t _wbuf; - _z_zbuf_t _zbuf; +typedef struct { + _z_transport_common_t _common; + _z_id_t _remote_zid; + _z_zint_t _sn_rx_reliable; + _z_zint_t _sn_rx_best_effort; + volatile bool _received; - // SN initial numbers - _z_zint_t _sn_res; - _z_zint_t _sn_tx_reliable; - _z_zint_t _sn_tx_best_effort; - volatile _z_zint_t _lease; +#if Z_FEATURE_FRAGMENTATION == 1 + // Defragmentation buffer + uint8_t _state_reliable; + uint8_t _state_best_effort; + _z_wbuf_t _dbuf_reliable; + _z_wbuf_t _dbuf_best_effort; +#endif +} _z_transport_unicast_t; +typedef struct _z_transport_multicast_t { + _z_transport_common_t _common; // Known valid peers _z_transport_peer_entry_list_t *_peers; - // T message send function _zp_f_send_tmsg _send_f; #if Z_FEATURE_MULTI_THREAD == 1 - _z_task_t *_read_task; - _z_task_t *_lease_task; - volatile bool _read_task_running; - volatile bool _lease_task_running; -#endif // Z_FEATURE_MULTI_THREAD == 1 - - volatile bool _transmitted; + _z_mutex_t _mutex_peer; // Peer list mutex +#endif } _z_transport_multicast_t; typedef struct { @@ -214,4 +169,26 @@ bool _z_transport_start_batching(_z_transport_t *zt); void _z_transport_stop_batching(_z_transport_t *zt); #endif -#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */ +#if Z_FEATURE_MULTI_THREAD == 1 +static inline z_result_t _z_transport_tx_mutex_lock(_z_transport_common_t *ztc, bool block) { + if (block) { + _z_mutex_lock(&ztc->_mutex_tx); + return _Z_RES_OK; + } else { + return _z_mutex_try_lock(&ztc->_mutex_tx); + } +} +static inline void _z_transport_tx_mutex_unlock(_z_transport_common_t *ztc) { _z_mutex_unlock(&ztc->_mutex_tx); } +static inline void _z_transport_rx_mutex_lock(_z_transport_common_t *ztc) { _z_mutex_lock(&ztc->_mutex_rx); } +static inline void _z_transport_rx_mutex_unlock(_z_transport_common_t *ztc) { _z_mutex_unlock(&ztc->_mutex_rx); } +#else +static inline z_result_t _z_transport_tx_mutex_lock(_z_transport_common_t *ztc, bool block) { + _ZP_UNUSED(ztc); + _ZP_UNUSED(block); + return _Z_RES_OK; +} +static inline void _z_transport_tx_mutex_unlock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); } +static inline void _z_transport_rx_mutex_lock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); } +static inline void _z_transport_rx_mutex_unlock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); } +#endif // Z_FEATURE_MULTI_THREAD == 1 +#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/unicast/transport.h b/include/zenoh-pico/transport/unicast/transport.h index d6eeb1599..b2ab36425 100644 --- a/include/zenoh-pico/transport/unicast/transport.h +++ b/include/zenoh-pico/transport/unicast/transport.h @@ -27,27 +27,4 @@ z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bo z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason); void _z_unicast_transport_clear(_z_transport_t *zt); -#if Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1 -static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) { - if (block) { - _z_mutex_lock(&ztu->_mutex_tx); - return _Z_RES_OK; - } else { - return _z_mutex_try_lock(&ztu->_mutex_tx); - } -} -static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_tx); } -static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _z_mutex_lock(&ztu->_mutex_rx); } -static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_rx); } - -#else -static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) { - _ZP_UNUSED(ztu); - _ZP_UNUSED(block); - return _Z_RES_OK; -} -static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); } -static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); } -static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); } -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1 -#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */ +#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/unicast/tx.h b/include/zenoh-pico/transport/unicast/tx.h deleted file mode 100644 index 41c991596..000000000 --- a/include/zenoh-pico/transport/unicast/tx.h +++ /dev/null @@ -1,26 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_UNICAST_TX_H -#define ZENOH_PICO_UNICAST_TX_H - -#include "zenoh-pico/net/session.h" -#include "zenoh-pico/transport/transport.h" - -z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); -z_result_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg); -z_result_t _z_unicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl); - -#endif /* ZENOH_PICO_TRANSPORT_LINK_TX_H */ diff --git a/src/api/api.c b/src/api/api.c index a2ed97a53..2056d956c 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -39,6 +39,7 @@ #include "zenoh-pico/session/utils.h" #include "zenoh-pico/system/platform.h" #include "zenoh-pico/system/platform_common.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/unicast.h" #include "zenoh-pico/utils/endianness.h" @@ -1440,13 +1441,14 @@ z_result_t zp_batch_start(const z_loaned_session_t *zs) { return _z_transport_start_batching(&session->_tp) ? _Z_RES_OK : _Z_ERR_GENERIC; } -z_result_t zp_batch_flush(const z_loaned_session_t *zs) { +z_result_t zp_batch_stop(const z_loaned_session_t *zs) { _z_session_t *session = _Z_RC_IN_VAL(zs); if (_Z_RC_IS_NULL(zs)) { return _Z_ERR_SESSION_CLOSED; } _z_transport_stop_batching(&session->_tp); - return _z_send_n_batch(session, Z_RELIABILITY_DEFAULT, Z_CONGESTION_CONTROL_DEFAULT); + // Send remaining batch + return _z_send_n_batch(session, Z_CONGESTION_CONTROL_DEFAULT); } #endif diff --git a/src/net/primitives.c b/src/net/primitives.c index c2232d6b4..4e235d0d4 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -33,6 +33,7 @@ #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/subscription.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" diff --git a/src/net/query.c b/src/net/query.c index 9230c1dde..a4b41b6aa 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -14,6 +14,7 @@ #include "zenoh-pico/net/query.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/utils/logging.h" void _z_query_clear_inner(_z_query_t *q) { diff --git a/src/protocol/core.c b/src/protocol/core.c index de0dbf517..4dda7b57c 100644 --- a/src/protocol/core.c +++ b/src/protocol/core.c @@ -27,6 +27,8 @@ #define _Z_ID_LEN (16) +const _z_id_t empty_id = {0}; + uint8_t _z_id_len(_z_id_t id) { uint8_t len = _Z_ID_LEN; while (len > 0) { @@ -38,33 +40,6 @@ uint8_t _z_id_len(_z_id_t id) { } return len; } -bool _z_id_check(_z_id_t id) { - bool ret = false; - for (int i = 0; !ret && i < _Z_ID_LEN; i++) { - ret |= id.id[i] != 0; - } - return ret; -} -_z_id_t _z_id_empty(void) { - return (_z_id_t){.id = { - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - }}; -} uint64_t _z_timestamp_ntp64_from_time(uint32_t seconds, uint32_t nanos) { const uint64_t FRAC_PER_SEC = (uint64_t)1 << 32; diff --git a/src/session/interest.c b/src/session/interest.c index 033942773..6bcdbed4c 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -26,6 +26,7 @@ #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_INTEREST == 1 diff --git a/src/session/rx.c b/src/session/rx.c index 800f795e3..54dc8983f 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -32,6 +32,7 @@ #include "zenoh-pico/session/session.h" #include "zenoh-pico/session/subscription.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/utils/logging.h" /*------------------ Handle message ------------------*/ diff --git a/src/session/tx.c b/src/session/tx.c deleted file mode 100644 index 6401fb623..000000000 --- a/src/session/tx.c +++ /dev/null @@ -1,61 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/multicast/tx.h" - -#include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/transport/unicast/tx.h" -#include "zenoh-pico/utils/logging.h" - -z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl) { - z_result_t ret = _Z_RES_OK; - // Call transport function - switch (zn->_tp._type) { - case _Z_TRANSPORT_UNICAST_TYPE: - ret = _z_unicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _z_multicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); - break; - case _Z_TRANSPORT_RAWETH_TYPE: - ret = _z_raweth_send_n_msg(zn, z_msg, reliability, cong_ctrl); - break; - default: - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; - break; - } - return ret; -} - -z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl) { - z_result_t ret = _Z_RES_OK; - // Call transport function - switch (zn->_tp._type) { - case _Z_TRANSPORT_UNICAST_TYPE: - ret = _z_unicast_send_n_batch(zn, reliability, cong_ctrl); - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _z_multicast_send_n_batch(zn, reliability, cong_ctrl); - break; - case _Z_TRANSPORT_RAWETH_TYPE: - _Z_INFO("Batching not yet supported on raweth transport"); - ret = _Z_ERR_TRANSPORT_TX_FAILED; - break; - default: - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; - break; - } - return ret; -} diff --git a/src/session/utils.c b/src/session/utils.c index f88fcf03d..0a2cff984 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -80,13 +80,13 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { // Note session in transport switch (zn->_tp._type) { case _Z_TRANSPORT_UNICAST_TYPE: - zn->_tp._transport._unicast._session = zsrc; + zn->_tp._transport._unicast._common._session = zsrc; break; case _Z_TRANSPORT_MULTICAST_TYPE: - zn->_tp._transport._multicast._session = zsrc; + zn->_tp._transport._multicast._common._session = zsrc; break; case _Z_TRANSPORT_RAWETH_TYPE: - zn->_tp._transport._raweth._session = zsrc; + zn->_tp._transport._raweth._common._session = zsrc; break; default: break; diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 75a837cbe..4b6d9bf32 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -16,15 +16,244 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/codec/network.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/definitions/transport.h" -#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/transport/unicast/tx.h" +#include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" /*------------------ Transmission helper ------------------*/ + +static _z_zint_t _z_transport_tx_get_sn(_z_transport_common_t *ztc, z_reliability_t reliability) { + _z_zint_t sn; + if (reliability == Z_RELIABILITY_RELIABLE) { + sn = ztc->_sn_tx_reliable; + ztc->_sn_tx_reliable = _z_sn_increment(ztc->_sn_res, ztc->_sn_tx_reliable); + } else { + sn = ztc->_sn_tx_best_effort; + ztc->_sn_tx_best_effort = _z_sn_increment(ztc->_sn_res, ztc->_sn_tx_best_effort); + } + return sn; +} + +#if Z_FEATURE_FRAGMENTATION == 1 +static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc, _z_wbuf_t *frag_buff, + const _z_network_message_t *n_msg, z_reliability_t reliability, + _z_zint_t first_sn) { + bool is_first = true; + _z_zint_t sn = first_sn; + // Encode message on temp buffer + _Z_RETURN_IF_ERR(_z_network_message_encode(frag_buff, n_msg)); + // Fragment message + while (_z_wbuf_len(frag_buff) > 0) { + // Get fragment sequence number + if (!is_first) { + sn = _z_transport_tx_get_sn(ztc, reliability); + } + is_first = false; + // Serialize fragment + __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn); + if (ret != _Z_RES_OK) { + _Z_ERROR("Fragment serialization failed with err %d", ret); + return ret; + } + // Send fragment + __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztc->_link, &ztc->_wbuf)); + ztc->_transmitted = true; // Tell session we transmitted data + } + return _Z_RES_OK; +} + +static z_result_t _z_transport_tx_send_fragment(_z_transport_common_t *ztc, const _z_network_message_t *n_msg, + z_reliability_t reliability, _z_zint_t first_sn) { + // Create an expandable wbuf for fragmentation + _z_wbuf_t frag_buff = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); + // Send message as fragments + z_result_t ret = _z_transport_tx_send_fragment_inner(ztc, &frag_buff, n_msg, reliability, first_sn); + // Clear the buffer as it's no longer required + _z_wbuf_clear(&frag_buff); + return ret; +} + +#else +static z_result_t _z_transport_tx_send_fragment(_z_transport_common_t *ztc, const _z_network_message_t *n_msg, + z_reliability_t reliability, _z_zint_t first_sn) { + _ZP_UNUSED(ztc); + _ZP_UNUSED(fbf); + _ZP_UNUSED(n_msg); + _ZP_UNUSED(reliability); + _ZP_UNUSED(first_sn); + _Z_INFO("Sending the message required fragmentation feature that is deactivated."); + return _Z_RES_OK; +} +#endif + +static inline bool _z_transport_tx_batch_has_data(_z_transport_common_t *ztc) { +#if Z_FEATURE_BATCHING == 1 + return (ztc->_batch_state == _Z_BATCHING_ACTIVE) && (ztc->_batch_count > 0); +#else + _ZP_UNUSED(ztc); + return false; +#endif +} + +static z_result_t _z_transport_tx_flush_buffer(_z_transport_common_t *ztc) { + // Send network message + __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztc->_link, &ztc->_wbuf)); + ztc->_transmitted = true; // Tell session we transmitted data +#if Z_FEATURE_BATCHING == 1 + ztc->_batch_count = 0; +#endif + return _Z_RES_OK; +} + +static z_result_t _z_transport_tx_flush_or_incr_batch(_z_transport_common_t *ztc) { +#if Z_FEATURE_BATCHING == 1 + if (ztc->_batch_state == _Z_BATCHING_ACTIVE) { + // Increment batch count + ztc->_batch_count++; + return _Z_RES_OK; + } else { + return _z_transport_tx_flush_buffer(ztc); + } +#else + return _z_transport_tx_flush_buffer(ztc); +#endif +} + +static z_result_t _z_transport_tx_batch_overflow(_z_transport_common_t *ztc, const _z_network_message_t *n_msg, + z_reliability_t reliability, _z_zint_t sn, size_t prev_wpos) { +#if Z_FEATURE_BATCHING == 1 + // Remove partially encoded data + _z_wbuf_set_wpos(&ztc->_wbuf, prev_wpos); + // Send batch + _Z_RETURN_IF_ERR(_z_transport_tx_flush_buffer(ztc)); + // Init buffer + __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + sn = _z_transport_tx_get_sn(ztc, reliability); + _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); + _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg)); + // Retry encode + z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); + if (ret != _Z_RES_OK) { + // Message still doesn't fit in buffer, send as fragments + return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); + } else { + // Increment batch + ztc->_batch_count++; + } + return _Z_RES_OK; +#else + _ZP_UNUSED(ztc); + _ZP_UNUSED(n_msg); + _ZP_UNUSED(reliability); + _ZP_UNUSED(sn); + _ZP_UNUSED(prev_wpos); + return _Z_RES_OK; +#endif +} + +static size_t _z_transport_tx_save_wpos(_z_wbuf_t *wbuf) { +#if Z_FEATURE_BATCHING == 1 + return _z_wbuf_get_wpos(wbuf); +#else + _ZP_UNUSED(wbuf); + return 0; +#endif +} + +static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, const _z_network_message_t *n_msg, + z_reliability_t reliability) { + // Init buffer + _z_zint_t sn = 0; + bool batch_has_data = _z_transport_tx_batch_has_data(ztc); + if (!batch_has_data) { + __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + sn = _z_transport_tx_get_sn(ztc, reliability); + _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); + _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg)); + } + // Try encoding the network message + size_t prev_wpos = _z_transport_tx_save_wpos(&ztc->_wbuf); + z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); + if (ret == _Z_RES_OK) { + // Flush buffer or increase batch + return _z_transport_tx_flush_or_incr_batch(ztc); + } else if (!batch_has_data) { + // Message doesn't fit in buffer, send as fragments + return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); + } else { + // Buffer is too full for message + return _z_transport_tx_batch_overflow(ztc, n_msg, reliability, sn, prev_wpos); + } +} + +z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { + z_result_t ret = _Z_RES_OK; + _Z_DEBUG("Send session message"); + _z_transport_tx_mutex_lock(ztc, true); + + // Encode transport message + __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + ret = _z_transport_message_encode(&ztc->_wbuf, t_msg); + if (ret == _Z_RES_OK) { + // Send message + __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + ret = _z_link_send_wbuf(&ztc->_link, &ztc->_wbuf); + if (ret == _Z_RES_OK) { + ztc->_transmitted = true; // Tell session we transmitted data + } + } + _z_transport_tx_mutex_unlock(ztc); + return ret; +} + +static z_result_t _z_transport_tx_send_n_msg(_z_transport_common_t *ztc, const _z_network_message_t *n_msg, + z_reliability_t reliability, z_congestion_control_t cong_ctrl) { + z_result_t ret = _Z_RES_OK; + _Z_DEBUG("Send network message"); + + // Acquire the lock and drop the message if needed + ret = _z_transport_tx_mutex_lock(ztc, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); + if (ret != _Z_RES_OK) { + _Z_INFO("Dropping zenoh message because of congestion control"); + return ret; + } + // Process message + ret = _z_transport_tx_send_n_msg_inner(ztc, n_msg, reliability); + _z_transport_tx_mutex_unlock(ztc); + return ret; +} + +static z_result_t _z_transport_tx_send_n_batch(_z_transport_common_t *ztc, z_congestion_control_t cong_ctrl) { +#if Z_FEATURE_BATCHING == 1 + // Check batch size + if (ztc->_batch_count > 0) { + // Acquire the lock and drop the message if needed + z_result_t ret = _z_transport_tx_mutex_lock(ztc, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); + if (ret != _Z_RES_OK) { + _Z_INFO("Dropping zenoh batch because of congestion control"); + return ret; + } + // Send batch + _Z_DEBUG("Send network batch"); + ret = _z_transport_tx_flush_buffer(ztc); + _z_transport_tx_mutex_unlock(ztc); + return ret; + } + return _Z_RES_OK; +#else + _ZP_UNUSED(ztc); + _ZP_UNUSED(cong_ctrl); + return _Z_RES_OK; +#endif +} + /** * This function is unsafe because it operates in potentially concurrent data. * Make sure that the following mutexes are locked before calling this function: @@ -74,13 +303,13 @@ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg z_result_t ret = _Z_RES_OK; switch (zt->_type) { case _Z_TRANSPORT_UNICAST_TYPE: - ret = _z_unicast_send_t_msg(&zt->_transport._unicast, t_msg); + ret = _z_transport_tx_send_t_msg(&zt->_transport._unicast._common, t_msg); break; case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _z_multicast_send_t_msg(&zt->_transport._multicast, t_msg); + ret = _z_transport_tx_send_t_msg(&zt->_transport._multicast._common, t_msg); break; case _Z_TRANSPORT_RAWETH_TYPE: - ret = _z_raweth_send_t_msg(&zt->_transport._raweth, t_msg); + ret = _z_raweth_send_t_msg(&zt->_transport._raweth._common, t_msg); break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; @@ -164,3 +393,45 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z return ret; } + +z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl) { + z_result_t ret = _Z_RES_OK; + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _z_transport_tx_send_n_msg(&zn->_tp._transport._unicast._common, z_msg, reliability, cong_ctrl); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _z_transport_tx_send_n_msg(&zn->_tp._transport._multicast._common, z_msg, reliability, cong_ctrl); + break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _z_raweth_send_n_msg(zn, z_msg, reliability, cong_ctrl); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; + } + return ret; +} + +z_result_t _z_send_n_batch(_z_session_t *zn, z_congestion_control_t cong_ctrl) { + z_result_t ret = _Z_RES_OK; + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _z_transport_tx_send_n_batch(&zn->_tp._transport._unicast._common, cong_ctrl); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _z_transport_tx_send_n_batch(&zn->_tp._transport._multicast._common, cong_ctrl); + break; + case _Z_TRANSPORT_RAWETH_TYPE: + _Z_INFO("Batching not yet supported on raweth transport"); + ret = _Z_ERR_TRANSPORT_TX_FAILED; + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; + } + return ret; +} diff --git a/src/transport/multicast.c b/src/transport/multicast.c index 42059f6dc..8425131df 100644 --- a/src/transport/multicast.c +++ b/src/transport/multicast.c @@ -24,7 +24,6 @@ #include "zenoh-pico/transport/common/read.h" #include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index b97663c8f..b8f98cd19 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -27,18 +27,18 @@ z_result_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { _z_conduit_sn_list_t next_sn; next_sn._is_qos = false; - next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; - next_sn._val._plain._reliable = ztm->_sn_tx_reliable; + next_sn._val._plain._best_effort = ztm->_common._sn_tx_best_effort; + next_sn._val._plain._reliable = ztm->_common._sn_tx_reliable; - _z_id_t zid = _Z_RC_IN_VAL(ztm->_session)->_local_zid; + _z_id_t zid = _Z_RC_IN_VAL(ztm->_common._session)->_local_zid; _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - return ztm->_send_f(ztm, &jsm); + return ztm->_send_f(&ztm->_common, &jsm); } z_result_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); - return ztm->_send_f(ztm, &t_msg); + return ztm->_send_f(&ztm->_common, &t_msg); } #else @@ -91,15 +91,15 @@ static _z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { void *_zp_multicast_lease_task(void *ztm_arg) { _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; - ztm->_transmitted = false; + ztm->_common._transmitted = false; // From all peers, get the next lease time (minimum) - int next_lease = (int)_z_get_minimum_lease(ztm->_peers, ztm->_lease); + int next_lease = (int)_z_get_minimum_lease(ztm->_peers, ztm->_common._lease); int next_keep_alive = (int)(next_lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); int next_join = Z_JOIN_INTERVAL; _z_transport_peer_entry_list_t *it = NULL; - while (ztm->_lease_task_running == true) { + while (ztm->_common._lease_task_running == true) { _z_mutex_lock(&ztm->_mutex_peer); if (next_lease <= 0) { @@ -122,7 +122,7 @@ void *_zp_multicast_lease_task(void *ztm_arg) { if (next_join <= 0) { _zp_multicast_send_join(ztm); - ztm->_transmitted = true; + ztm->_common._transmitted = true; // Reset the join parameters next_join = Z_JOIN_INTERVAL; @@ -130,17 +130,18 @@ void *_zp_multicast_lease_task(void *ztm_arg) { if (next_keep_alive <= 0) { // Check if need to send a keep alive - if (ztm->_transmitted == false) { + if (ztm->_common._transmitted == false) { if (_zp_multicast_send_keep_alive(ztm) < 0) { _Z_INFO("Send keep alive failed."); } } // Reset the keep alive parameters - ztm->_transmitted = false; - next_keep_alive = (int)(_z_get_minimum_lease(ztm->_peers, ztm->_lease) / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); + ztm->_common._transmitted = false; + next_keep_alive = + (int)(_z_get_minimum_lease(ztm->_peers, ztm->_common._lease) / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); } // Query timeout process - _z_pending_query_process_timeout(_Z_RC_IN_VAL(ztm->_session)); + _z_pending_query_process_timeout(_Z_RC_IN_VAL(ztm->_common._session)); // Compute the target interval to sleep int interval; @@ -191,19 +192,19 @@ void *_zp_multicast_lease_task(void *ztm_arg) { z_result_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); - ztm->_lease_task_running = true; // Init before z_task_init for concurrency issue + ztm->_common._lease_task_running = true; // Init before z_task_init for concurrency issue // Init task if (_z_task_init(task, attr, _zp_multicast_lease_task, ztm) != _Z_RES_OK) { - ztm->_lease_task_running = false; + ztm->_common._lease_task_running = false; return _Z_ERR_SYSTEM_TASK_FAILED; } // Attach task - ztm->_lease_task = task; + ztm->_common._lease_task = task; return _Z_RES_OK; } z_result_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { - ztm->_lease_task_running = false; + ztm->_common._lease_task_running = false; return _Z_RES_OK; } #else diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index 5cc377c5e..ad7a1eb1c 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -58,41 +58,42 @@ void *_zp_multicast_read_task(void *ztm_arg) { _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; // Acquire and keep the lock - _z_mutex_lock(&ztm->_mutex_rx); + _z_mutex_lock(&ztm->_common._mutex_rx); // Prepare the buffer - _z_zbuf_reset(&ztm->_zbuf); + _z_zbuf_reset(&ztm->_common._zbuf); uint8_t addr_buff[_Z_MULTICAST_ADDR_BUFF_SIZE] = {0}; _z_slice_t addr = _z_slice_alias_buf(addr_buff, sizeof(addr_buff)); - while (ztm->_read_task_running == true) { + while (ztm->_common._read_task_running == true) { size_t to_read = 0; // Read bytes from socket to the main buffer - switch (ztm->_link._cap._flow) { + switch (ztm->_common._link._cap._flow) { case Z_LINK_CAP_FLOW_STREAM: - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztm->_zbuf); + if (_z_zbuf_len(&ztm->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, &addr); + if (_z_zbuf_len(&ztm->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztm->_common._zbuf); continue; } } // Get stream size - to_read = _z_read_stream_size(&ztm->_zbuf); + to_read = _z_read_stream_size(&ztm->_common._zbuf); // Read data - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); + if (_z_zbuf_len(&ztm->_common._zbuf) < to_read) { + _z_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, NULL); + if (_z_zbuf_len(&ztm->_common._zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_common._zbuf, + _z_zbuf_get_rpos(&ztm->_common._zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_common._zbuf); continue; } } break; case Z_LINK_CAP_FLOW_DATAGRAM: - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + _z_zbuf_compact(&ztm->_common._zbuf); + to_read = _z_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, &addr); if (to_read == SIZE_MAX) { continue; } @@ -101,7 +102,7 @@ void *_zp_multicast_read_task(void *ztm_arg) { break; } // Wrap the main buffer to_read bytes - _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); + _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_common._zbuf, to_read); while (_z_zbuf_len(&zbuf) > 0) { // Decode one session message @@ -118,37 +119,37 @@ void *_zp_multicast_read_task(void *ztm_arg) { } } else { _Z_ERROR("Connection closed due to malformed message: %d", ret); - ztm->_read_task_running = false; + ztm->_common._read_task_running = false; continue; } } // Move the read position of the read buffer - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); + _z_zbuf_set_rpos(&ztm->_common._zbuf, _z_zbuf_get_rpos(&ztm->_common._zbuf) + to_read); if (_z_multicast_update_rx_buffer(ztm) != _Z_RES_OK) { _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); - ztm->_read_task_running = false; + ztm->_common._read_task_running = false; } } - _z_mutex_unlock(&ztm->_mutex_rx); + _z_mutex_unlock(&ztm->_common._mutex_rx); return NULL; } z_result_t _zp_multicast_start_read_task(_z_transport_t *zt, z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); - zt->_transport._multicast._read_task_running = true; // Init before z_task_init for concurrency issue + zt->_transport._multicast._common._read_task_running = true; // Init before z_task_init for concurrency issue // Init task if (_z_task_init(task, attr, _zp_multicast_read_task, &zt->_transport._multicast) != _Z_RES_OK) { - zt->_transport._multicast._read_task_running = false; + zt->_transport._multicast._common._read_task_running = false; return _Z_ERR_SYSTEM_TASK_FAILED; } // Attach task - zt->_transport._multicast._read_task = task; + zt->_transport._multicast._common._read_task = task; return _Z_RES_OK; } z_result_t _zp_multicast_stop_read_task(_z_transport_t *zt) { - zt->_transport._multicast._read_task_running = false; + zt->_transport._multicast._common._read_task_running = false; return _Z_RES_OK; } #else diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 4fb823e46..d10d9709f 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -35,27 +35,28 @@ static z_result_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_t _Z_DEBUG(">> recv session msg"); z_result_t ret = _Z_RES_OK; - _z_multicast_rx_mutex_lock(ztm); + _z_transport_rx_mutex_lock(&ztm->_common); size_t to_read = 0; do { - switch (ztm->_link._cap._flow) { + switch (ztm->_common._link._cap._flow) { case Z_LINK_CAP_FLOW_STREAM: - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztm->_zbuf); + if (_z_zbuf_len(&ztm->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, addr); + if (_z_zbuf_len(&ztm->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztm->_common._zbuf); ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; break; } } // Get stream size - to_read = _z_read_stream_size(&ztm->_zbuf); + to_read = _z_read_stream_size(&ztm->_common._zbuf); // Read data - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); + if (_z_zbuf_len(&ztm->_common._zbuf) < to_read) { + _z_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, addr); + if (_z_zbuf_len(&ztm->_common._zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_common._zbuf, + _z_zbuf_get_rpos(&ztm->_common._zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_common._zbuf); ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; break; } @@ -63,8 +64,8 @@ static z_result_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_t break; // Datagram capable links case Z_LINK_CAP_FLOW_DATAGRAM: - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + _z_zbuf_compact(&ztm->_common._zbuf); + to_read = _z_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, addr); if (to_read == SIZE_MAX) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } @@ -75,10 +76,10 @@ static z_result_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_t } while (false); // The 1-iteration loop to use continue to break the entire loop on error if (ret == _Z_RES_OK) { - _Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_zbuf)); - ret = _z_transport_message_decode(t_msg, &ztm->_zbuf); + _Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_common._zbuf)); + ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf); } - _z_multicast_rx_mutex_unlock(ztm); + _z_transport_rx_mutex_unlock(&ztm->_common); return ret; } @@ -167,7 +168,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, zm->_reliability = _z_t_msg_get_reliability(t_msg); _z_msg_fix_mapping(zm, mapping); - _z_handle_network_message(ztm->_session, zm, mapping); + _z_handle_network_message(ztm->_common._session, zm, mapping); } break; @@ -239,7 +240,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (ret == _Z_RES_OK) { uint16_t mapping = entry->_peer_id; _z_msg_fix_mapping(&zm, mapping); - _z_handle_network_message(ztm->_session, &zm, mapping); + _z_handle_network_message(ztm->_common._session, &zm, mapping); } else { _Z_INFO("Failed to decode defragmented message"); ret = _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; @@ -364,20 +365,20 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm) { // Check if user or defragment buffer took ownership of buffer - if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) { + if (_z_zbuf_get_ref_count(&ztm->_common._zbuf) != 1) { // Allocate a new buffer _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } // Recopy leftover bytes - size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + size_t leftovers = _z_zbuf_len(&ztm->_common._zbuf); if (leftovers > 0) { - _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_common._zbuf); } // Drop buffer & update - _z_zbuf_clear(&ztm->_zbuf); - ztm->_zbuf = new_zbuf; + _z_zbuf_clear(&ztm->_common._zbuf); + ztm->_common._zbuf = new_zbuf; } return _Z_RES_OK; } diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c index 4ff1f3717..776d007fb 100644 --- a/src/transport/multicast/transport.c +++ b/src/transport/multicast/transport.c @@ -25,7 +25,6 @@ #include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/utils.h" @@ -42,7 +41,7 @@ z_result_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, case Z_LINK_CAP_TRANSPORT_MULTICAST: zt->_type = _Z_TRANSPORT_MULTICAST_TYPE; ztm = &zt->_transport._multicast; - ztm->_send_f = _z_multicast_send_t_msg; + ztm->_send_f = _z_transport_tx_send_t_msg; break; case Z_LINK_CAP_TRANSPORT_RAWETH: zt->_type = _Z_TRANSPORT_RAWETH_TYPE; @@ -55,23 +54,23 @@ z_result_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, // Initialize batching data #if Z_FEATURE_BATCHING == 1 - ztm->_batch_state = _Z_BATCHING_IDLE; - ztm->_batch = _z_network_message_vec_make(0); + ztm->_common._batch_state = _Z_BATCHING_IDLE; + ztm->_common._batch_count = 0; #endif #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes - ret = _z_mutex_init(&ztm->_mutex_tx); + ret = _z_mutex_init(&ztm->_common._mutex_tx); if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&ztm->_mutex_rx); + ret = _z_mutex_init(&ztm->_common._mutex_rx); if (ret == _Z_RES_OK) { ret = _z_mutex_init(&ztm->_mutex_peer); if (ret != _Z_RES_OK) { - _z_mutex_drop(&ztm->_mutex_tx); - _z_mutex_drop(&ztm->_mutex_rx); + _z_mutex_drop(&ztm->_common._mutex_tx); + _z_mutex_drop(&ztm->_common._mutex_rx); } } else { - _z_mutex_drop(&ztm->_mutex_tx); + _z_mutex_drop(&ztm->_common._mutex_tx); } } #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -79,51 +78,52 @@ z_result_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, // Initialize the read and write buffers if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; - ztm->_wbuf = _z_wbuf_make(mtu, false); - ztm->_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + ztm->_common._wbuf = _z_wbuf_make(mtu, false); + ztm->_common._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&ztm->_wbuf) != mtu) || (_z_zbuf_capacity(&ztm->_zbuf) != Z_BATCH_MULTICAST_SIZE)) { + if ((_z_wbuf_capacity(&ztm->_common._wbuf) != mtu) || + (_z_zbuf_capacity(&ztm->_common._zbuf) != Z_BATCH_MULTICAST_SIZE)) { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; _Z_ERROR("Not enough memory to allocate transport tx rx buffers!"); #if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_drop(&ztm->_mutex_tx); - _z_mutex_drop(&ztm->_mutex_rx); + _z_mutex_drop(&ztm->_common._mutex_tx); + _z_mutex_drop(&ztm->_common._mutex_rx); _z_mutex_drop(&ztm->_mutex_peer); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_wbuf_clear(&ztm->_wbuf); - _z_zbuf_clear(&ztm->_zbuf); + _z_wbuf_clear(&ztm->_common._wbuf); + _z_zbuf_clear(&ztm->_common._zbuf); } } if (ret == _Z_RES_OK) { // Set default SN resolution - ztm->_sn_res = _z_sn_max(param->_seq_num_res); + ztm->_common._sn_res = _z_sn_max(param->_seq_num_res); // The initial SN at TX side - ztm->_sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; - ztm->_sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; + ztm->_common._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; + ztm->_common._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; // Initialize peer list ztm->_peers = _z_transport_peer_entry_list_new(); #if Z_FEATURE_MULTI_THREAD == 1 // Tasks - ztm->_read_task_running = false; - ztm->_read_task = NULL; - ztm->_lease_task_running = false; - ztm->_lease_task = NULL; + ztm->_common._read_task_running = false; + ztm->_common._read_task = NULL; + ztm->_common._lease_task_running = false; + ztm->_common._lease_task = NULL; #endif // Z_FEATURE_MULTI_THREAD == 1 - ztm->_lease = Z_TRANSPORT_LEASE; + ztm->_common._lease = Z_TRANSPORT_LEASE; // Notifiers - ztm->_transmitted = false; + ztm->_common._transmitted = false; // Transport link for multicast - ztm->_link = *zl; + ztm->_common._link = *zl; } return ret; } @@ -179,7 +179,7 @@ z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason z_result_t ret = _Z_RES_OK; // Send and clear message _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); - ret = ztm->_send_f(ztm, &cm); + ret = ztm->_send_f(&ztm->_common, &cm); _z_t_msg_clear(&cm); return ret; } @@ -192,31 +192,27 @@ void _z_multicast_transport_clear(_z_transport_t *zt) { _z_transport_multicast_t *ztm = &zt->_transport._multicast; #if Z_FEATURE_MULTI_THREAD == 1 // Clean up tasks - if (ztm->_read_task != NULL) { - _z_task_join(ztm->_read_task); - _z_task_free(&ztm->_read_task); + if (ztm->_common._read_task != NULL) { + _z_task_join(ztm->_common._read_task); + _z_task_free(&ztm->_common._read_task); } - if (ztm->_lease_task != NULL) { - _z_task_join(ztm->_lease_task); - _z_task_free(&ztm->_lease_task); + if (ztm->_common._lease_task != NULL) { + _z_task_join(ztm->_common._lease_task); + _z_task_free(&ztm->_common._lease_task); } // Clean up the mutexes - _z_mutex_drop(&ztm->_mutex_tx); - _z_mutex_drop(&ztm->_mutex_rx); + _z_mutex_drop(&ztm->_common._mutex_tx); + _z_mutex_drop(&ztm->_common._mutex_rx); _z_mutex_drop(&ztm->_mutex_peer); #endif // Z_FEATURE_MULTI_THREAD == 1 -#if Z_FEATURE_BATCHING == 1 - _z_network_message_vec_clear(&ztm->_batch); -#endif - // Clean up the buffers - _z_wbuf_clear(&ztm->_wbuf); - _z_zbuf_clear(&ztm->_zbuf); + _z_wbuf_clear(&ztm->_common._wbuf); + _z_zbuf_clear(&ztm->_common._zbuf); // Clean up peer list _z_transport_peer_entry_list_free(&ztm->_peers); - _z_link_clear(&ztm->_link); + _z_link_clear(&ztm->_common._link); } #else diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c deleted file mode 100644 index 572134c11..000000000 --- a/src/transport/multicast/tx.c +++ /dev/null @@ -1,287 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/common/tx.h" - -#include "zenoh-pico/config.h" -#include "zenoh-pico/protocol/codec/network.h" -#include "zenoh-pico/protocol/codec/transport.h" -#include "zenoh-pico/transport/multicast/transport.h" -#include "zenoh-pico/transport/multicast/tx.h" -#include "zenoh-pico/transport/utils.h" -#include "zenoh-pico/utils/logging.h" - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - -/** - * This function is unsafe because it operates in potentially concurrent data. - * Make sure that the following mutexes are locked before calling this function: - * - ztm->_mutex_inner - */ -static _z_zint_t __unsafe_z_multicast_get_sn(_z_transport_multicast_t *ztm, z_reliability_t reliability) { - _z_zint_t sn; - if (reliability == Z_RELIABILITY_RELIABLE) { - sn = ztm->_sn_tx_reliable; - ztm->_sn_tx_reliable = _z_sn_increment(ztm->_sn_res, ztm->_sn_tx_reliable); - } else { - sn = ztm->_sn_tx_best_effort; - ztm->_sn_tx_best_effort = _z_sn_increment(ztm->_sn_res, ztm->_sn_tx_best_effort); - } - return sn; -} - -#if Z_FEATURE_FRAGMENTATION == 1 -static z_result_t __unsafe_z_multicast_send_fragment(_z_transport_multicast_t *ztm, _z_wbuf_t *fbf, - const _z_network_message_t *n_msg, z_reliability_t reliability, - _z_zint_t first_sn) { - bool is_first = true; - _z_zint_t sn = first_sn; - // Encode message on temp buffer - _Z_RETURN_IF_ERR(_z_network_message_encode(fbf, n_msg)); - // Fragment message - while (_z_wbuf_len(fbf) > 0) { - // Get fragment sequence number - if (!is_first) { - sn = __unsafe_z_multicast_get_sn(ztm, reliability); - } - is_first = false; - // Serialize fragment - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, fbf, reliability, sn); - if (ret != _Z_RES_OK) { - _Z_ERROR("Fragment serialization failed with err %d", ret); - return ret; - } - // Send fragment - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); - ztm->_transmitted = true; // Tell session we have transmitted data - } - return _Z_RES_OK; -} -#else -static z_result_t __unsafe_z_multicast_send_fragment(_z_transport_multicast_t *ztm, _z_wbuf_t *fbf, - const _z_network_message_t *n_msg, z_reliability_t reliability, - _z_zint_t first_sn) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(fbf); - _ZP_UNUSED(n_msg); - _ZP_UNUSED(reliability); - _ZP_UNUSED(first_sn); - _Z_INFO("Sending the message required fragmentation feature that is deactivated."); - return _Z_RES_OK; -} -#endif // Z_FEATURE_FRAGMENTATION == 1 - -static z_result_t __unsafe_z_multicast_message_send(_z_transport_multicast_t *ztm, const _z_network_message_t *n_msg, - z_reliability_t reliability) { - _Z_DEBUG("Send network message"); - // Encode frame header - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); - _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); - // Encode network message - z_result_t ret = _z_network_message_encode(&ztm->_wbuf, n_msg); - // The message does not fit in the current batch, let's fragment it - if (ret != _Z_RES_OK) { - // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); - // Send message as fragments - ret = __unsafe_z_multicast_send_fragment(ztm, &fbf, n_msg, reliability, sn); - // Clear the buffer as it's no longer required - _z_wbuf_clear(&fbf); - return ret; - } - // Send network message - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); - ztm->_transmitted = true; // Tell session we transmitted data - return _Z_RES_OK; -} - -#if Z_FEATURE_BATCHING == 1 -static z_result_t __unsafe_z_multicast_message_batch(_z_transport_multicast_t *ztm, const _z_network_message_t *n_msg) { - _Z_DEBUG("Batching network message"); - // Copy network message - _z_network_message_t *batch_msg = z_malloc(sizeof(_z_network_message_t)); - if (batch_msg == NULL) { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - if (_z_n_msg_copy(batch_msg, n_msg) != _Z_RES_OK) { - z_free(batch_msg); - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - _z_network_message_vec_append(&ztm->_batch, batch_msg); - return _Z_RES_OK; -} - -static z_result_t __unsafe_multicast_batch_send(_z_transport_multicast_t *ztm, z_reliability_t reliability) { - z_result_t ret = _Z_RES_OK; - // Get network message number - size_t msg_nb = _z_network_message_vec_len(&ztm->_batch); - size_t msg_idx = 0; - size_t curr_msg_nb = 0; - if (msg_nb == 0) { - return _Z_RES_OK; - } - // Encode the frame header - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); - _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); - size_t curr_wpos = _z_wbuf_get_wpos(&ztm->_wbuf); - // Process batch - while (msg_idx < msg_nb) { - // Encode a network message - _z_network_message_t *n_msg = _z_network_message_vec_get(&ztm->_batch, msg_idx); - assert(n_msg != NULL); - if (_z_network_message_encode(&ztm->_wbuf, n_msg) != _Z_RES_OK) { - // Remove partially encoded data - _z_wbuf_set_wpos(&ztm->_wbuf, curr_wpos); - // Handle case where one message is too big to fit in frame - if (curr_msg_nb == 0) { - _Z_INFO("Batch sending interrupted by a message needing to be fragmented."); - // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); - // Send message as fragments - ret = __unsafe_z_multicast_send_fragment(ztm, &fbf, n_msg, reliability, sn); - // Clear the buffer as it's no longer required - _z_wbuf_clear(&fbf); - if (ret != _Z_RES_OK) { - _Z_ERROR("Send fragmented message failed with err %d.", ret); - } - // Message is sent or skipped - msg_idx++; - } else { // Frame has messages but is full - _Z_INFO("Sending batch in multiple frames because it is too big for one"); - // Send frame - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); - ztm->_transmitted = true; - curr_msg_nb = 0; - } - // Reset frame - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - sn = __unsafe_z_multicast_get_sn(ztm, reliability); - t_msg = _z_t_msg_make_frame_header(sn, reliability); - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); - } else { - curr_wpos = _z_wbuf_get_wpos(&ztm->_wbuf); - msg_idx++; - curr_msg_nb++; - } - } - // Send frame - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); - ztm->_transmitted = true; // Tell session we transmitted data - return ret; -} -#endif - -z_result_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { - z_result_t ret = _Z_RES_OK; - _Z_DEBUG("Send session message"); - _z_multicast_tx_mutex_lock(ztm, true); - - // Encode transport message - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - ret = _z_transport_message_encode(&ztm->_wbuf, t_msg); - if (ret == _Z_RES_OK) { - // Send message - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); - ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); - if (ret == _Z_RES_OK) { - ztm->_transmitted = true; // Tell session we transmitted data - } - } - _z_multicast_tx_mutex_unlock(ztm); - return ret; -} - -z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl) { - z_result_t ret = _Z_RES_OK; - _z_transport_multicast_t *ztm = &zn->_tp._transport._multicast; - - // Acquire the lock and drop the message if needed - ret = _z_multicast_tx_mutex_lock(ztm, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); - if (ret != _Z_RES_OK) { - _Z_INFO("Dropping zenoh message because of congestion control"); - return ret; - } - // Process batching -#if Z_FEATURE_BATCHING == 1 - if (ztm->_batch_state == _Z_BATCHING_ACTIVE) { - ret = __unsafe_z_multicast_message_batch(ztm, n_msg); - } else { - ret = __unsafe_z_multicast_message_send(ztm, n_msg, reliability); - } -#else - ret = __unsafe_z_multicast_message_send(ztm, n_msg, reliability); -#endif - _z_multicast_tx_mutex_unlock(ztm); - return ret; -} - -z_result_t _z_multicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl) { -#if Z_FEATURE_BATCHING == 1 - _Z_DEBUG("Send network batch"); - _z_transport_multicast_t *ztm = &zn->_tp._transport._multicast; - // Acquire the lock and drop the message if needed - z_result_t ret = _z_multicast_tx_mutex_lock(ztm, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); - if (ret != _Z_RES_OK) { - _Z_INFO("Dropping zenoh batch because of congestion control"); - return ret; - } - // Send batch - ret = __unsafe_multicast_batch_send(ztm, reliability); - // Clean up - _z_network_message_vec_clear(&ztm->_batch); - _z_multicast_tx_mutex_unlock(ztm); - return ret; -#else - _ZP_UNUSED(zn); - _ZP_UNUSED(reliability); - _ZP_UNUSED(cong_ctrl); - _Z_ERROR("Tried to send batch but batching feature is deactivated."); - return _Z_ERR_TRANSPORT_TX_FAILED; -#endif -} - -#else -z_result_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(t_msg); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl) { - _ZP_UNUSED(zn); - _ZP_UNUSED(n_msg); - _ZP_UNUSED(reliability); - _ZP_UNUSED(cong_ctrl); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -z_result_t _z_multicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl) { - _ZP_UNUSED(zn); - _ZP_UNUSED(reliability); - _ZP_UNUSED(cong_ctrl); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c index 4c49d7b64..aa9995562 100644 --- a/src/transport/raweth/read.c +++ b/src/transport/raweth/read.c @@ -59,7 +59,7 @@ void *_zp_raweth_read_task(void *ztm_arg) { _z_slice_t addr = _z_slice_alias_buf(NULL, 0); // Task loop - while (ztm->_read_task_running == true) { + while (ztm->_common._read_task_running == true) { // Read message from link z_result_t ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr); switch (ret) { @@ -74,7 +74,7 @@ void *_zp_raweth_read_task(void *ztm_arg) { default: // Drop message & stop task _Z_ERROR("Connection closed due to malformed message: %d", ret); - ztm->_read_task_running = false; + ztm->_common._read_task_running = false; _z_slice_clear(&addr); continue; break; @@ -83,7 +83,7 @@ void *_zp_raweth_read_task(void *ztm_arg) { ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); if (ret != _Z_RES_OK) { _Z_ERROR("Connection closed due to message processing error: %d", ret); - ztm->_read_task_running = false; + ztm->_common._read_task_running = false; _z_slice_clear(&addr); continue; } @@ -91,7 +91,7 @@ void *_zp_raweth_read_task(void *ztm_arg) { _z_slice_clear(&addr); if (_z_raweth_update_rx_buff(ztm) != _Z_RES_OK) { _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); - ztm->_read_task_running = false; + ztm->_common._read_task_running = false; } } return NULL; @@ -100,19 +100,19 @@ void *_zp_raweth_read_task(void *ztm_arg) { z_result_t _zp_raweth_start_read_task(_z_transport_t *zt, z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); - zt->_transport._unicast._lease_task_running = true; // Init before z_task_init for concurrency issue + zt->_transport._unicast._common._lease_task_running = true; // Init before z_task_init for concurrency issue // Init task if (_z_task_init(task, attr, _zp_raweth_read_task, &zt->_transport._raweth) != _Z_RES_OK) { - zt->_transport._unicast._lease_task_running = false; + zt->_transport._unicast._common._lease_task_running = false; return _Z_ERR_SYSTEM_TASK_FAILED; } // Attach task - zt->_transport._raweth._read_task = task; + zt->_transport._raweth._common._read_task = task; return _Z_RES_OK; } z_result_t _zp_raweth_stop_read_task(_z_transport_t *zt) { - zt->_transport._raweth._read_task_running = false; + zt->_transport._raweth._common._read_task_running = false; return _Z_RES_OK; } #else diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 296bbe5fb..389b57c70 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -78,16 +78,16 @@ z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_m _Z_DEBUG(">> recv session msg"); z_result_t ret = _Z_RES_OK; - _z_multicast_rx_mutex_lock(ztm); + _z_transport_rx_mutex_lock(&ztm->_common); // Prepare the buffer - _z_zbuf_reset(&ztm->_zbuf); + _z_zbuf_reset(&ztm->_common._zbuf); - switch (ztm->_link._cap._flow) { + switch (ztm->_common._link._cap._flow) { // Datagram capable links case Z_LINK_CAP_FLOW_DATAGRAM: { - _z_zbuf_compact(&ztm->_zbuf); + _z_zbuf_compact(&ztm->_common._zbuf); // Read from link - size_t to_read = _z_raweth_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + size_t to_read = _z_raweth_link_recv_zbuf(&ztm->_common._link, &ztm->_common._zbuf, addr); if (to_read == SIZE_MAX) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } @@ -99,10 +99,10 @@ z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_m } // Decode message if (ret == _Z_RES_OK) { - _Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_zbuf)); - ret = _z_transport_message_decode(t_msg, &ztm->_zbuf); + _Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_common._zbuf)); + ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf); } - _z_multicast_rx_mutex_unlock(ztm); + _z_transport_rx_mutex_unlock(&ztm->_common); return ret; } @@ -112,20 +112,20 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_mess z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm) { // Check if user or defragment buffer took ownership of buffer - if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) { + if (_z_zbuf_get_ref_count(&ztm->_common._zbuf) != 1) { // Allocate a new buffer _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } // Recopy leftover bytes - size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + size_t leftovers = _z_zbuf_len(&ztm->_common._zbuf); if (leftovers > 0) { - _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_common._zbuf); } // Drop buffer & update - _z_zbuf_clear(&ztm->_zbuf); - ztm->_zbuf = new_zbuf; + _z_zbuf_clear(&ztm->_common._zbuf); + ztm->_common._zbuf = new_zbuf; } return _Z_RES_OK; } diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 87b5693ab..e44d5bdab 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -87,11 +87,11 @@ static z_result_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_s static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_reliability_t reliability) { _z_zint_t sn; if (reliability == Z_RELIABILITY_RELIABLE) { - sn = ztm->_sn_tx_reliable; - ztm->_sn_tx_reliable = _z_sn_increment(ztm->_sn_res, ztm->_sn_tx_reliable); + sn = ztm->_common._sn_tx_reliable; + ztm->_common._sn_tx_reliable = _z_sn_increment(ztm->_common._sn_res, ztm->_common._sn_tx_reliable); } else { - sn = ztm->_sn_tx_best_effort; - ztm->_sn_tx_best_effort = _z_sn_increment(ztm->_sn_res, ztm->_sn_tx_best_effort); + sn = ztm->_common._sn_tx_best_effort; + ztm->_common._sn_tx_best_effort = _z_sn_increment(ztm->_common._sn_res, ztm->_common._sn_tx_best_effort); } return sn; } @@ -187,26 +187,26 @@ z_result_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_mes return ret; } -z_result_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { +z_result_t _z_raweth_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { z_result_t ret = _Z_RES_OK; _Z_DEBUG(">> send session message"); - _z_multicast_tx_mutex_lock(ztm, true); + _z_transport_tx_mutex_lock(ztc, true); // Reset wbuf - _z_wbuf_reset(&ztm->_wbuf); + _z_wbuf_reset(&ztc->_wbuf); // Set socket info - _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztc->_link._socket._raweth), _z_transport_tx_mutex_unlock(ztc)); // Prepare buff - __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); + __unsafe_z_raweth_prepare_header(&ztc->_link, &ztc->_wbuf); // Encode the session message - _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, t_msg), _z_transport_tx_mutex_unlock(ztc)); // Write the message header - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztc->_link, &ztc->_wbuf), _z_transport_tx_mutex_unlock(ztc)); // Send the wbuf on the socket - _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztc->_link, &ztc->_wbuf), _z_transport_tx_mutex_unlock(ztc)); // Mark the session that we have transmitted data - ztm->_transmitted = true; - _z_multicast_tx_mutex_unlock(ztm); + ztc->_transmitted = true; + _z_transport_tx_mutex_unlock(ztc); return ret; } @@ -217,7 +217,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_DEBUG(">> send network message"); // Acquire the lock and drop the message if needed - ret = _z_multicast_tx_mutex_lock(ztm, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); + ret = _z_transport_tx_mutex_lock(&ztm->_common, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); if (ret != _Z_RES_OK) { _Z_INFO("Dropping zenoh message because of congestion control"); return ret; @@ -239,32 +239,34 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ break; } // Reset wbuf - _z_wbuf_reset(&ztm->_wbuf); + _z_wbuf_reset(&ztm->_common._wbuf); // Set socket info - _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth), - _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_common._link._socket._raweth), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Prepare buff - __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); + __unsafe_z_raweth_prepare_header(&ztm->_common._link, &ztm->_common._wbuf); // Set the frame header _z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability); _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); // Encode the frame header - _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_common._wbuf, &t_msg), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Encode the network message - if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) { + if (_z_network_message_encode(&ztm->_common._wbuf, n_msg) == _Z_RES_OK) { // Write the eth header - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), - _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Send the wbuf on the socket - _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_common._link, &ztm->_common._wbuf), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Mark the session that we have transmitted data - ztm->_transmitted = true; + ztm->_common._transmitted = true; } else { // The message does not fit in the current batch, let's fragment it #if Z_FEATURE_FRAGMENTATION == 1 // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf - _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common)); // Fragment and send the message bool is_first = true; while (_z_wbuf_len(&fbf) > 0) { @@ -274,20 +276,20 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ } is_first = false; // Reset wbuf - _z_wbuf_reset(&ztm->_wbuf); + _z_wbuf_reset(&ztm->_common._wbuf); // Prepare buff - __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); + __unsafe_z_raweth_prepare_header(&ztm->_common._link, &ztm->_common._wbuf); // Serialize one fragment - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn), - _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Write the eth header - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), - _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Send the wbuf on the socket - _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), - _z_multicast_tx_mutex_unlock(ztm)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_common._link, &ztm->_common._wbuf), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Mark the session that we have transmitted data - ztm->_transmitted = true; + ztm->_common._transmitted = true; } // Clear the expandable buffer _z_wbuf_clear(&fbf); @@ -295,9 +297,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_INFO("Sending the message required fragmentation feature that is deactivated."); #endif } -#if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_unlock(&ztm->_mutex_tx); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _z_transport_tx_mutex_unlock(&ztm->_common); return ret; } @@ -307,8 +307,8 @@ z_result_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_mes _ZP_UNUSED(t_msg); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -z_result_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { - _ZP_UNUSED(ztm); +z_result_t _z_raweth_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztc); _ZP_UNUSED(t_msg); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } diff --git a/src/transport/transport.c b/src/transport/transport.c index 28cc1eb48..2540ec5ac 100644 --- a/src/transport/transport.c +++ b/src/transport/transport.c @@ -22,13 +22,11 @@ #include "zenoh-pico/link/link.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/transport/raweth/rx.h" #include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/transport.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/unicast/transport.h" -#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -81,15 +79,19 @@ void _z_transport_free(_z_transport_t **zt) { #if Z_FEATURE_BATCHING == 1 bool _z_transport_start_batching(_z_transport_t *zt) { uint8_t *batch_state = NULL; + size_t *batch_count = NULL; switch (zt->_type) { case _Z_TRANSPORT_UNICAST_TYPE: - batch_state = &zt->_transport._unicast._batch_state; + batch_state = &zt->_transport._unicast._common._batch_state; + batch_count = &zt->_transport._unicast._common._batch_count; break; case _Z_TRANSPORT_MULTICAST_TYPE: - batch_state = &zt->_transport._multicast._batch_state; + batch_state = &zt->_transport._multicast._common._batch_state; + batch_count = &zt->_transport._multicast._common._batch_count; break; case _Z_TRANSPORT_RAWETH_TYPE: - batch_state = &zt->_transport._raweth._batch_state; + batch_state = &zt->_transport._raweth._common._batch_state; + batch_count = &zt->_transport._raweth._common._batch_count; break; default: break; @@ -97,6 +99,7 @@ bool _z_transport_start_batching(_z_transport_t *zt) { if (*batch_state == _Z_BATCHING_ACTIVE) { return false; } + *batch_count = 0; *batch_state = _Z_BATCHING_ACTIVE; return true; } @@ -105,13 +108,13 @@ void _z_transport_stop_batching(_z_transport_t *zt) { uint8_t *batch_state = NULL; switch (zt->_type) { case _Z_TRANSPORT_UNICAST_TYPE: - batch_state = &zt->_transport._unicast._batch_state; + batch_state = &zt->_transport._unicast._common._batch_state; break; case _Z_TRANSPORT_MULTICAST_TYPE: - batch_state = &zt->_transport._multicast._batch_state; + batch_state = &zt->_transport._multicast._common._batch_state; break; case _Z_TRANSPORT_RAWETH_TYPE: - batch_state = &zt->_transport._raweth._batch_state; + batch_state = &zt->_transport._raweth._common._batch_state; break; default: break; diff --git a/src/transport/unicast.c b/src/transport/unicast.c index dca9ae0ea..3c771a126 100644 --- a/src/transport/unicast.c +++ b/src/transport/unicast.c @@ -26,7 +26,6 @@ #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/transport/unicast/read.h" #include "zenoh-pico/transport/unicast/rx.h" -#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" diff --git a/src/transport/unicast/lease.c b/src/transport/unicast/lease.c index 1e3da00cd..c4cd362f1 100644 --- a/src/transport/unicast/lease.c +++ b/src/transport/unicast/lease.c @@ -16,8 +16,8 @@ #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/unicast/transport.h" -#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_UNICAST_TRANSPORT == 1 @@ -26,7 +26,7 @@ z_result_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { z_result_t ret = _Z_RES_OK; _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); - ret = _z_unicast_send_t_msg(ztu, &t_msg); + ret = _z_transport_tx_send_t_msg(&ztu->_common, &t_msg); return ret; } @@ -44,11 +44,11 @@ void *_zp_unicast_lease_task(void *ztu_arg) { _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; ztu->_received = false; - ztu->_transmitted = false; + ztu->_common._transmitted = false; - int next_lease = (int)ztu->_lease; - int next_keep_alive = (int)(ztu->_lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); - while (ztu->_lease_task_running == true) { + int next_lease = (int)ztu->_common._lease; + int next_keep_alive = (int)(ztu->_common._lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); + while (ztu->_common._lease_task_running == true) { // Next lease process if (next_lease <= 0) { // Check if received data @@ -56,27 +56,27 @@ void *_zp_unicast_lease_task(void *ztu_arg) { // Reset the lease parameters ztu->_received = false; } else { - _Z_INFO("Closing session because it has expired after %zums", ztu->_lease); - ztu->_lease_task_running = false; + _Z_INFO("Closing session because it has expired after %zums", ztu->_common._lease); + ztu->_common._lease_task_running = false; _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); break; } - next_lease = (int)ztu->_lease; + next_lease = (int)ztu->_common._lease; } // Next keep alive process if (next_keep_alive <= 0) { // Check if need to send a keep alive - if (ztu->_transmitted == false) { + if (ztu->_common._transmitted == false) { if (_zp_unicast_send_keep_alive(ztu) < 0) { _Z_INFO("Send keep alive failed."); } } // Reset the keep alive parameters - ztu->_transmitted = false; - next_keep_alive = (int)(ztu->_lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); + ztu->_common._transmitted = false; + next_keep_alive = (int)(ztu->_common._lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); } // Query timeout process - _z_pending_query_process_timeout(_Z_RC_IN_VAL(ztu->_session)); + _z_pending_query_process_timeout(_Z_RC_IN_VAL(ztu->_common._session)); // Compute the target interval int interval; @@ -101,18 +101,18 @@ void *_zp_unicast_lease_task(void *ztu_arg) { z_result_t _zp_unicast_start_lease_task(_z_transport_t *zt, z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); - zt->_transport._unicast._lease_task_running = true; // Init before z_task_init for concurrency issue + zt->_transport._unicast._common._lease_task_running = true; // Init before z_task_init for concurrency issue // Init task if (_z_task_init(task, attr, _zp_unicast_lease_task, &zt->_transport._unicast) != _Z_RES_OK) { return _Z_ERR_SYSTEM_TASK_FAILED; } // Attach task - zt->_transport._unicast._lease_task = task; + zt->_transport._unicast._common._lease_task = task; return _Z_RES_OK; } z_result_t _zp_unicast_stop_lease_task(_z_transport_t *zt) { - zt->_transport._unicast._lease_task_running = false; + zt->_transport._unicast._common._lease_task_running = false; return _Z_RES_OK; } #else diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 738de4c02..ab347251d 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -53,38 +53,39 @@ void *_zp_unicast_read_task(void *ztu_arg) { _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; // Acquire and keep the lock - _z_mutex_lock(&ztu->_mutex_rx); + _z_mutex_lock(&ztu->_common._mutex_rx); // Prepare the buffer - _z_zbuf_reset(&ztu->_zbuf); + _z_zbuf_reset(&ztu->_common._zbuf); - while (ztu->_read_task_running == true) { + while (ztu->_common._read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - switch (ztu->_link._cap._flow) { + switch (ztu->_common._link._cap._flow) { case Z_LINK_CAP_FLOW_STREAM: - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); + if (_z_zbuf_len(&ztu->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_link_recv_zbuf(&ztu->_common._link, &ztu->_common._zbuf, NULL); + if (_z_zbuf_len(&ztu->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_common._zbuf); continue; } } // Get stream size - to_read = _z_read_stream_size(&ztu->_zbuf); + to_read = _z_read_stream_size(&ztu->_common._zbuf); // Read data - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); + if (_z_zbuf_len(&ztu->_common._zbuf) < to_read) { + _z_link_recv_zbuf(&ztu->_common._link, &ztu->_common._zbuf, NULL); + if (_z_zbuf_len(&ztu->_common._zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_common._zbuf, + _z_zbuf_get_rpos(&ztu->_common._zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_common._zbuf); continue; } } break; case Z_LINK_CAP_FLOW_DATAGRAM: - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + _z_zbuf_compact(&ztu->_common._zbuf); + to_read = _z_link_recv_zbuf(&ztu->_common._link, &ztu->_common._zbuf, NULL); if (to_read == SIZE_MAX) { continue; } @@ -93,7 +94,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { break; } // Wrap the main buffer for to_read bytes - _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read); + _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_common._zbuf, to_read); // Mark the session that we have received data ztu->_received = true; @@ -111,43 +112,43 @@ void *_zp_unicast_read_task(void *ztu_arg) { if (ret != _Z_ERR_CONNECTION_CLOSED) { _Z_ERROR("Connection closed due to message processing error: %d", ret); } - ztu->_read_task_running = false; + ztu->_common._read_task_running = false; continue; } } else { _Z_ERROR("Connection closed due to malformed message: %d", ret); - ztu->_read_task_running = false; + ztu->_common._read_task_running = false; continue; } } // Move the read position of the read buffer - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) + to_read); + _z_zbuf_set_rpos(&ztu->_common._zbuf, _z_zbuf_get_rpos(&ztu->_common._zbuf) + to_read); if (_z_unicast_update_rx_buffer(ztu) != _Z_RES_OK) { _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); - ztu->_read_task_running = false; + ztu->_common._read_task_running = false; } } - _z_mutex_unlock(&ztu->_mutex_rx); + _z_mutex_unlock(&ztu->_common._mutex_rx); return NULL; } z_result_t _zp_unicast_start_read_task(_z_transport_t *zt, z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); - zt->_transport._unicast._read_task_running = true; // Init before z_task_init for concurrency issue + zt->_transport._unicast._common._read_task_running = true; // Init before z_task_init for concurrency issue // Init task if (_z_task_init(task, attr, _zp_unicast_read_task, &zt->_transport._unicast) != _Z_RES_OK) { - zt->_transport._unicast._read_task_running = false; + zt->_transport._unicast._common._read_task_running = false; return _Z_ERR_SYSTEM_TASK_FAILED; } // Attach task - zt->_transport._unicast._read_task = task; + zt->_transport._unicast._common._read_task = task; return _Z_RES_OK; } z_result_t _zp_unicast_stop_read_task(_z_transport_t *zt) { - zt->_transport._unicast._read_task_running = false; + zt->_transport._unicast._common._read_task_running = false; return _Z_RES_OK; } diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index d109ae513..ca9591bba 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -32,28 +32,29 @@ z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg) { _Z_DEBUG(">> recv session msg"); z_result_t ret = _Z_RES_OK; - _z_unicast_rx_mutex_lock(ztu); + _z_transport_rx_mutex_lock(&ztu->_common); size_t to_read = 0; do { - switch (ztu->_link._cap._flow) { + switch (ztu->_common._link._cap._flow) { // Stream capable links case Z_LINK_CAP_FLOW_STREAM: - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); + if (_z_zbuf_len(&ztu->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_link_recv_zbuf(&ztu->_common._link, &ztu->_common._zbuf, NULL); + if (_z_zbuf_len(&ztu->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_common._zbuf); ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; continue; } } // Get stream size - to_read = _z_read_stream_size(&ztu->_zbuf); + to_read = _z_read_stream_size(&ztu->_common._zbuf); // Read data - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); + if (_z_zbuf_len(&ztu->_common._zbuf) < to_read) { + _z_link_recv_zbuf(&ztu->_common._link, &ztu->_common._zbuf, NULL); + if (_z_zbuf_len(&ztu->_common._zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_common._zbuf, + _z_zbuf_get_rpos(&ztu->_common._zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_common._zbuf); ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; continue; } @@ -61,8 +62,8 @@ z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_me break; // Datagram capable links case Z_LINK_CAP_FLOW_DATAGRAM: - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + _z_zbuf_compact(&ztu->_common._zbuf); + to_read = _z_link_recv_zbuf(&ztu->_common._link, &ztu->_common._zbuf, NULL); if (to_read == SIZE_MAX) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } @@ -74,14 +75,14 @@ z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_me if (ret == _Z_RES_OK) { _Z_DEBUG(">> \t transport_message_decode"); - ret = _z_transport_message_decode(t_msg, &ztu->_zbuf); + ret = _z_transport_message_decode(t_msg, &ztu->_common._zbuf); // Mark the session that we have received data if (ret == _Z_RES_OK) { ztu->_received = true; } } - _z_unicast_rx_mutex_unlock(ztu); + _z_transport_rx_mutex_unlock(&ztu->_common); return ret; } @@ -99,7 +100,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { + if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { ztu->_sn_rx_reliable = t_msg->_body._frame._sn; } else { #if Z_FEATURE_FRAGMENTATION == 1 @@ -110,7 +111,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } } else { - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { + if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; } else { #if Z_FEATURE_FRAGMENTATION == 1 @@ -127,7 +128,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t for (size_t i = 0; i < len; i++) { _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); zm->_reliability = _z_t_msg_get_reliability(t_msg); - _z_handle_network_message(ztu->_session, zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); + _z_handle_network_message(ztu->_common._session, zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); } break; @@ -190,7 +191,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t ret = _z_network_message_decode(&zm, &zbf); zm._reliability = _z_t_msg_get_reliability(t_msg); if (ret == _Z_RES_OK) { - _z_handle_network_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); + _z_handle_network_message(ztu->_common._session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); } else { _Z_INFO("Failed to decode defragmented message"); ret = _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; @@ -239,21 +240,21 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu) { // Check if user or defragment buffer took ownership of buffer - if (_z_zbuf_get_ref_count(&ztu->_zbuf) != 1) { + if (_z_zbuf_get_ref_count(&ztu->_common._zbuf) != 1) { // Allocate a new buffer - size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); + size_t buff_capacity = _z_zbuf_capacity(&ztu->_common._zbuf); _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); if (_z_zbuf_capacity(&new_zbuf) != buff_capacity) { return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } // Recopy leftover bytes - size_t leftovers = _z_zbuf_len(&ztu->_zbuf); + size_t leftovers = _z_zbuf_len(&ztu->_common._zbuf); if (leftovers > 0) { - _z_zbuf_copy_bytes(&new_zbuf, &ztu->_zbuf); + _z_zbuf_copy_bytes(&new_zbuf, &ztu->_common._zbuf); } // Drop buffer & update - _z_zbuf_clear(&ztu->_zbuf); - ztu->_zbuf = new_zbuf; + _z_zbuf_clear(&ztu->_common._zbuf); + ztu->_common._zbuf = new_zbuf; } return _Z_RES_OK; } diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 99357c1ba..fe48e3f24 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -25,7 +25,6 @@ #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/transport/unicast/read.h" #include "zenoh-pico/transport/unicast/rx.h" -#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -40,17 +39,17 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, // Initialize batching data #if Z_FEATURE_BATCHING == 1 - ztu->_batch_state = _Z_BATCHING_IDLE; - ztu->_batch = _z_network_message_vec_make(0); + ztu->_common._batch_state = _Z_BATCHING_IDLE; + ztu->_common._batch_count = 0; #endif #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes - ret = _z_mutex_init(&ztu->_mutex_tx); + ret = _z_mutex_init(&ztu->_common._mutex_tx); if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&ztu->_mutex_rx); + ret = _z_mutex_init(&ztu->_common._mutex_rx); if (ret != _Z_RES_OK) { - _z_mutex_drop(&ztu->_mutex_tx); + _z_mutex_drop(&ztu->_common._mutex_tx); } } #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -62,21 +61,22 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, size_t zbuf_size = param->_batch_size; // Initialize tx rx buffers - ztu->_wbuf = _z_wbuf_make(wbuf_size, false); - ztu->_zbuf = _z_zbuf_make(zbuf_size); + ztu->_common._wbuf = _z_wbuf_make(wbuf_size, false); + ztu->_common._zbuf = _z_zbuf_make(zbuf_size); // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&ztu->_wbuf) != wbuf_size) || (_z_zbuf_capacity(&ztu->_zbuf) != zbuf_size)) { + if ((_z_wbuf_capacity(&ztu->_common._wbuf) != wbuf_size) || + (_z_zbuf_capacity(&ztu->_common._zbuf) != zbuf_size)) { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; _Z_ERROR("Not enough memory to allocate transport tx rx buffers!"); #if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_drop(&ztu->_mutex_tx); - _z_mutex_drop(&ztu->_mutex_rx); + _z_mutex_drop(&ztu->_common._mutex_tx); + _z_mutex_drop(&ztu->_common._mutex_rx); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_wbuf_clear(&ztu->_wbuf); - _z_zbuf_clear(&ztu->_zbuf); + _z_wbuf_clear(&ztu->_common._wbuf); + _z_zbuf_clear(&ztu->_common._zbuf); } #if Z_FEATURE_FRAGMENTATION == 1 @@ -90,34 +90,34 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, if (ret == _Z_RES_OK) { // Set default SN resolution - ztu->_sn_res = _z_sn_max(param->_seq_num_res); + ztu->_common._sn_res = _z_sn_max(param->_seq_num_res); // The initial SN at TX side - ztu->_sn_tx_reliable = param->_initial_sn_tx; - ztu->_sn_tx_best_effort = param->_initial_sn_tx; + ztu->_common._sn_tx_reliable = param->_initial_sn_tx; + ztu->_common._sn_tx_best_effort = param->_initial_sn_tx; // The initial SN at RX side - _z_zint_t initial_sn_rx = _z_sn_decrement(ztu->_sn_res, param->_initial_sn_rx); + _z_zint_t initial_sn_rx = _z_sn_decrement(ztu->_common._sn_res, param->_initial_sn_rx); ztu->_sn_rx_reliable = initial_sn_rx; ztu->_sn_rx_best_effort = initial_sn_rx; #if Z_FEATURE_MULTI_THREAD == 1 // Tasks - ztu->_read_task_running = false; - ztu->_read_task = NULL; - ztu->_lease_task_running = false; - ztu->_lease_task = NULL; + ztu->_common._read_task_running = false; + ztu->_common._read_task = NULL; + ztu->_common._lease_task_running = false; + ztu->_common._lease_task = NULL; #endif // Z_FEATURE_MULTI_THREAD == 1 // Notifiers ztu->_received = 0; - ztu->_transmitted = 0; + ztu->_common._transmitted = 0; // Transport lease - ztu->_lease = param->_lease; + ztu->_common._lease = param->_lease; // Transport link for unicast - ztu->_link = *zl; + ztu->_common._link = *zl; // Remote peer PID ztu->_remote_zid = param->_remote_zid; @@ -309,7 +309,7 @@ z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bo z_result_t ret = _Z_RES_OK; // Send and clear message _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); - ret = _z_unicast_send_t_msg(ztu, &cm); + ret = _z_transport_tx_send_t_msg(&ztu->_common, &cm); _z_t_msg_clear(&cm); return ret; } @@ -322,27 +322,23 @@ void _z_unicast_transport_clear(_z_transport_t *zt) { _z_transport_unicast_t *ztu = &zt->_transport._unicast; #if Z_FEATURE_MULTI_THREAD == 1 // Clean up tasks - if (ztu->_read_task != NULL) { - _z_task_join(ztu->_read_task); - _z_task_free(&ztu->_read_task); + if (ztu->_common._read_task != NULL) { + _z_task_join(ztu->_common._read_task); + _z_task_free(&ztu->_common._read_task); } - if (ztu->_lease_task != NULL) { - _z_task_join(ztu->_lease_task); - _z_task_free(&ztu->_lease_task); + if (ztu->_common._lease_task != NULL) { + _z_task_join(ztu->_common._lease_task); + _z_task_free(&ztu->_common._lease_task); } // Clean up the mutexes - _z_mutex_drop(&ztu->_mutex_tx); - _z_mutex_drop(&ztu->_mutex_rx); + _z_mutex_drop(&ztu->_common._mutex_tx); + _z_mutex_drop(&ztu->_common._mutex_rx); #endif // Z_FEATURE_MULTI_THREAD == 1 -#if Z_FEATURE_BATCHING == 1 - _z_network_message_vec_clear(&ztu->_batch); -#endif - // Clean up the buffers - _z_wbuf_clear(&ztu->_wbuf); - _z_zbuf_clear(&ztu->_zbuf); + _z_wbuf_clear(&ztu->_common._wbuf); + _z_zbuf_clear(&ztu->_common._zbuf); #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_reliable); _z_wbuf_clear(&ztu->_dbuf_best_effort); @@ -350,7 +346,7 @@ void _z_unicast_transport_clear(_z_transport_t *zt) { // Clean up PIDs ztu->_remote_zid = _z_id_empty(); - _z_link_clear(&ztu->_link); + _z_link_clear(&ztu->_common._link); } #else diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c deleted file mode 100644 index 78bc04f86..000000000 --- a/src/transport/unicast/tx.c +++ /dev/null @@ -1,289 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/common/tx.h" - -#include - -#include "zenoh-pico/config.h" -#include "zenoh-pico/protocol/codec/network.h" -#include "zenoh-pico/protocol/codec/transport.h" -#include "zenoh-pico/protocol/iobuf.h" -#include "zenoh-pico/transport/unicast/transport.h" -#include "zenoh-pico/transport/unicast/tx.h" -#include "zenoh-pico/transport/utils.h" -#include "zenoh-pico/utils/logging.h" - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - -/** - * This function is unsafe because it operates in potentially concurrent data. - * Make sure that the following mutexes are locked before calling this function: - * - ztu->_mutex_inner - */ -static _z_zint_t __unsafe_z_unicast_get_sn(_z_transport_unicast_t *ztu, z_reliability_t reliability) { - _z_zint_t sn; - if (reliability == Z_RELIABILITY_RELIABLE) { - sn = ztu->_sn_tx_reliable; - ztu->_sn_tx_reliable = _z_sn_increment(ztu->_sn_res, ztu->_sn_tx_reliable); - } else { - sn = ztu->_sn_tx_best_effort; - ztu->_sn_tx_best_effort = _z_sn_increment(ztu->_sn_res, ztu->_sn_tx_best_effort); - } - return sn; -} - -#if Z_FEATURE_FRAGMENTATION == 1 -static z_result_t __unsafe_z_unicast_send_fragment(_z_transport_unicast_t *ztu, _z_wbuf_t *fbf, - const _z_network_message_t *n_msg, z_reliability_t reliability, - _z_zint_t first_sn) { - bool is_first = true; - _z_zint_t sn = first_sn; - // Encode message on temp buffer - _Z_RETURN_IF_ERR(_z_network_message_encode(fbf, n_msg)); - // Fragment message - while (_z_wbuf_len(fbf) > 0) { - // Get fragment sequence number - if (!is_first) { - sn = __unsafe_z_unicast_get_sn(ztu, reliability); - } - is_first = false; - // Serialize fragment - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, fbf, reliability, sn); - if (ret != _Z_RES_OK) { - _Z_ERROR("Fragment serialization failed with err %d", ret); - return ret; - } - // Send fragment - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztu->_link, &ztu->_wbuf)); - ztu->_transmitted = true; // Tell session we transmitted data - } - return _Z_RES_OK; -} -#else -static z_result_t __unsafe_z_unicast_send_fragment(_z_transport_unicast_t *ztu, _z_wbuf_t *fbf, - const _z_network_message_t *n_msg, z_reliability_t reliability, - _z_zint_t first_sn) { - _ZP_UNUSED(ztu); - _ZP_UNUSED(fbf); - _ZP_UNUSED(n_msg); - _ZP_UNUSED(reliability); - _ZP_UNUSED(first_sn); - _Z_INFO("Sending the message required fragmentation feature that is deactivated."); - return _Z_RES_OK; -} -#endif - -static z_result_t __unsafe_z_unicast_message_send(_z_transport_unicast_t *ztu, const _z_network_message_t *n_msg, - z_reliability_t reliability) { - _Z_DEBUG("Send network message"); - // Encode frame header - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); - _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztu->_wbuf, &t_msg)); - // Encode the network message - z_result_t ret = _z_network_message_encode(&ztu->_wbuf, n_msg); - // The message does not fit in the current batch, let's fragment it - if (ret != _Z_RES_OK) { - // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); - // Send message as fragments - ret = __unsafe_z_unicast_send_fragment(ztu, &fbf, n_msg, reliability, sn); - // Clear the buffer as it's no longer required - _z_wbuf_clear(&fbf); - return ret; - } - // Send network message - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztu->_link, &ztu->_wbuf)); - ztu->_transmitted = true; // Tell session we transmitted data - return _Z_RES_OK; -} - -#if Z_FEATURE_BATCHING == 1 -static z_result_t __unsafe_z_unicast_message_batch(_z_transport_unicast_t *ztu, const _z_network_message_t *n_msg) { - _Z_DEBUG("Batching network message"); - // Copy network message - _z_network_message_t *batch_msg = z_malloc(sizeof(_z_network_message_t)); - if (batch_msg == NULL) { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - if (_z_n_msg_copy(batch_msg, n_msg) != _Z_RES_OK) { - z_free(batch_msg); - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - _z_network_message_vec_append(&ztu->_batch, batch_msg); - return _Z_RES_OK; -} - -static z_result_t __unsafe_unicast_batch_send(_z_transport_unicast_t *ztu, z_reliability_t reliability) { - z_result_t ret = _Z_RES_OK; - // Get network message number - size_t msg_nb = _z_network_message_vec_len(&ztu->_batch); - size_t msg_idx = 0; - size_t curr_msg_nb = 0; - if (msg_nb == 0) { - return _Z_RES_OK; - } - // Encode the frame header - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); - _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztu->_wbuf, &t_msg)); - size_t curr_wpos = _z_wbuf_get_wpos(&ztu->_wbuf); - // Process batch - while (msg_idx < msg_nb) { - // Encode a network message - _z_network_message_t *n_msg = _z_network_message_vec_get(&ztu->_batch, msg_idx); - assert(n_msg != NULL); - if (_z_network_message_encode(&ztu->_wbuf, n_msg) != _Z_RES_OK) { - // Remove partially encoded data - _z_wbuf_set_wpos(&ztu->_wbuf, curr_wpos); - // Handle case where one message is too big to fit in frame - if (curr_msg_nb == 0) { - _Z_INFO("Batch sending interrupted by a message needing to be fragmented."); - // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); - // Send message as fragments - ret = __unsafe_z_unicast_send_fragment(ztu, &fbf, n_msg, reliability, sn); - // Clear the buffer as it's no longer required - _z_wbuf_clear(&fbf); - if (ret != _Z_RES_OK) { - _Z_ERROR("Send fragmented message failed with err %d.", ret); - } - // Message is sent or skipped - msg_idx++; - } else { // Frame has messages but is full - _Z_INFO("Sending batch in multiple frames because it is too big for one"); - // Send frame - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztu->_link, &ztu->_wbuf)); - ztu->_transmitted = true; - curr_msg_nb = 0; - } - // Reset frame - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - sn = __unsafe_z_unicast_get_sn(ztu, reliability); - t_msg = _z_t_msg_make_frame_header(sn, reliability); - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztu->_wbuf, &t_msg)); - } else { - curr_wpos = _z_wbuf_get_wpos(&ztu->_wbuf); - msg_idx++; - curr_msg_nb++; - } - } - // Send frame - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztu->_link, &ztu->_wbuf)); - ztu->_transmitted = true; // Tell session we transmitted data - return ret; -} -#endif - -z_result_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg) { - z_result_t ret = _Z_RES_OK; - _Z_DEBUG("Send session message"); - _z_unicast_tx_mutex_lock(ztu, true); - - // Encode transport message - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - ret = _z_transport_message_encode(&ztu->_wbuf, t_msg); - if (ret == _Z_RES_OK) { - // Send message - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); - ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); - if (ret == _Z_RES_OK) { - ztu->_transmitted = true; // Tell session we transmitted data - } - } - _z_unicast_tx_mutex_unlock(ztu); - return ret; -} - -z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl) { - z_result_t ret = _Z_RES_OK; - _z_transport_unicast_t *ztu = &zn->_tp._transport._unicast; - - // Acquire the lock and drop the message if needed - ret = _z_unicast_tx_mutex_lock(ztu, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); - if (ret != _Z_RES_OK) { - _Z_INFO("Dropping zenoh message because of congestion control"); - return ret; - } - // Process batching -#if Z_FEATURE_BATCHING == 1 - if (ztu->_batch_state == _Z_BATCHING_ACTIVE) { - ret = __unsafe_z_unicast_message_batch(ztu, n_msg); - } else { - ret = __unsafe_z_unicast_message_send(ztu, n_msg, reliability); - } -#else - ret = __unsafe_z_unicast_message_send(ztu, n_msg, reliability); -#endif - _z_unicast_tx_mutex_unlock(ztu); - return ret; -} - -z_result_t _z_unicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl) { -#if Z_FEATURE_BATCHING == 1 - _Z_DEBUG("Send network batch"); - _z_transport_unicast_t *ztu = &zn->_tp._transport._unicast; - // Acquire the lock and drop the message if needed - z_result_t ret = _z_unicast_tx_mutex_lock(ztu, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK); - if (ret != _Z_RES_OK) { - _Z_INFO("Dropping zenoh batch because of congestion control"); - return ret; - } - // Send batch - ret = __unsafe_unicast_batch_send(ztu, reliability); - // Clean up - _z_network_message_vec_clear(&ztu->_batch); - _z_unicast_tx_mutex_unlock(ztu); - return ret; -#else - _ZP_UNUSED(zn); - _ZP_UNUSED(reliability); - _ZP_UNUSED(cong_ctrl); - _Z_ERROR("Tried to send batch but batching feature is deactivated."); - return _Z_ERR_TRANSPORT_TX_FAILED; -#endif -} - -#else -z_result_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg) { - _ZP_UNUSED(ztu); - _ZP_UNUSED(t_msg); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl) { - _ZP_UNUSED(zn); - _ZP_UNUSED(n_msg); - _ZP_UNUSED(reliability); - _ZP_UNUSED(cong_ctrl); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -z_result_t _z_unicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl) { - _ZP_UNUSED(zn); - _ZP_UNUSED(reliability); - _ZP_UNUSED(cong_ctrl); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1