Skip to content

Commit

Permalink
Batching v2 (#765)
Browse files Browse the repository at this point in the history
* feat: switch id empty/check to static inline

* feat: use batch count instead of batch vec

* refactor: batch don't copy messages

* feat: rename batch api

* feat: group common transport fields

* feat: use common struct in tx functions

* feat: rename multicast tx functions

* fix: unreachable

* refactor: fuse unicast and multicast tx
jean-roland authored Oct 28, 2024
1 parent 5dd1fcd commit 7388049
Showing 35 changed files with 645 additions and 1,149 deletions.
6 changes: 3 additions & 3 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
@@ -2060,7 +2060,7 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs
/**
* Activate the batching mechanism.
* Any message that would have been sent on the network by a subsequent api call (e.g z_put, z_get)
* will be instead stored until the batch is flushed with :c:func:`zp_batch_flush`.
* will be instead stored until the batch is full or batching is stopped with :c:func:`zp_batch_stop`.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages.
@@ -2071,15 +2071,15 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs
z_result_t zp_batch_start(const z_loaned_session_t *zs);

/**
* Deactivate the batching mechanism and flush the messages that were stored.
* Deactivate the batching mechanism and flush the remaining messages.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will stop batching messages.
*
* Return:
* ``0`` if batching stopped and batch successfully sent, ``negative value`` otherwise.
*/
z_result_t zp_batch_flush(const z_loaned_session_t *zs);
z_result_t zp_batch_stop(const z_loaned_session_t *zs);
#endif

/************* Multi Thread Tasks helpers **************/
5 changes: 3 additions & 2 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
@@ -54,9 +54,10 @@ typedef size_t _z_zint_t;
typedef struct {
uint8_t id[16];
} _z_id_t;
extern const _z_id_t empty_id;
uint8_t _z_id_len(_z_id_t id);
bool _z_id_check(_z_id_t id);
_z_id_t _z_id_empty(void);
static inline bool _z_id_check(_z_id_t id) { return memcmp(&id, &empty_id, sizeof(id)) != 0; }
static inline _z_id_t _z_id_empty(void) { return (_z_id_t){0}; }

/**
* A zenoh timestamp.
3 changes: 0 additions & 3 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
@@ -31,9 +31,6 @@ void _z_session_clear(_z_session_t *zn);
z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);

z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
z_result_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

#if Z_FEATURE_MULTI_THREAD == 1
static inline void _z_session_mutex_lock(_z_session_t *zn) { (void)_z_mutex_lock(&zn->_mutex_inner); }
4 changes: 4 additions & 0 deletions include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
@@ -26,7 +26,11 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);

/*------------------ Transmission and Reception helpers ------------------*/
z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg);
z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg);
z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg);
z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_congestion_control_t cong_ctrl);

#endif /* ZENOH_PICO_TRANSPORT_TX_H */
21 changes: 0 additions & 21 deletions include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
@@ -28,31 +28,10 @@ z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t r
void _z_multicast_transport_clear(_z_transport_t *zt);

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) {
if (block) {
_z_mutex_lock(&ztm->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztm->_mutex_tx);
}
}
static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); }
static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_rx); }
static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_rx); }
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_peer); }

#else
static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) {
_ZP_UNUSED(ztm);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }

#endif // (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
#endif /* ZENOH_PICO_MULTICAST_TRANSPORT_H */
26 changes: 0 additions & 26 deletions include/zenoh-pico/transport/multicast/tx.h

This file was deleted.

2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/raweth/tx.h
Original file line number Diff line number Diff line change
@@ -21,6 +21,6 @@
z_result_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg);
z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg);
z_result_t _z_raweth_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg);

#endif /* ZENOH_PICO_RAWETH_TX_H */
141 changes: 59 additions & 82 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
@@ -37,25 +37,23 @@ enum _z_batching_state_e {
};

typedef struct {
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif

_z_id_t _remote_zid;
_z_slice_t _remote_addr;
_z_conduit_sn_list_t _sn_rx_sns;

// SN numbers
_z_zint_t _sn_res;
volatile _z_zint_t _lease;
volatile _z_zint_t _next_lease;

uint16_t _peer_id;
volatile bool _received;

#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif
} _z_transport_peer_entry_t;

size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src);
@@ -71,107 +69,64 @@ _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport
// Forward type declaration to avoid cyclical include
typedef struct _z_session_rc_t _z_session_rc_ref_t;

// Forward declaration to be used in _zp_f_send_tmsg*
typedef struct _z_transport_multicast_t _z_transport_multicast_t;
// Send function prototype
typedef z_result_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_transport_message_t *t_msg);

typedef struct {
// Session associated to the transport
_z_session_rc_ref_t *_session;

#if Z_FEATURE_MULTI_THREAD == 1
// TX and RX mutexes
_z_mutex_t _mutex_rx;
_z_mutex_t _mutex_tx;
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_link_t _link;

#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffer
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif

// Regular Buffers
// TX and RX buffers
_z_wbuf_t _wbuf;
_z_zbuf_t _zbuf;

_z_id_t _remote_zid;

// SN numbers
_z_zint_t _sn_res;
_z_zint_t _sn_tx_reliable;
_z_zint_t _sn_tx_best_effort;
_z_zint_t _sn_rx_reliable;
_z_zint_t _sn_rx_best_effort;
volatile _z_zint_t _lease;

// Transport batching
#if Z_FEATURE_BATCHING == 1
uint8_t _batch_state;
_z_network_message_vec_t _batch;
#endif

#if Z_FEATURE_MULTI_THREAD == 1
_z_task_t *_read_task;
_z_task_t *_lease_task;
volatile bool _read_task_running;
volatile bool _lease_task_running;
#endif // Z_FEATURE_MULTI_THREAD == 1

volatile bool _received;
volatile bool _transmitted;
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
// Session associated to the transport
_z_session_rc_ref_t *_session;

#if Z_FEATURE_MULTI_THREAD == 1
// TX and RX mutexes
_z_mutex_t _mutex_rx;
_z_mutex_t _mutex_tx;

// Peer list mutex
_z_mutex_t _mutex_peer;
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_task_t *_read_task;
_z_task_t *_lease_task;
volatile bool _read_task_running;
volatile bool _lease_task_running;
#endif
// Transport batching
#if Z_FEATURE_BATCHING == 1
uint8_t _batch_state;
_z_network_message_vec_t _batch;
size_t _batch_count;
#endif
} _z_transport_common_t;

_z_link_t _link;
// Send function prototype
typedef z_result_t (*_zp_f_send_tmsg)(_z_transport_common_t *self, const _z_transport_message_t *t_msg);

// TX and RX buffers
_z_wbuf_t _wbuf;
_z_zbuf_t _zbuf;
typedef struct {
_z_transport_common_t _common;
_z_id_t _remote_zid;
_z_zint_t _sn_rx_reliable;
_z_zint_t _sn_rx_best_effort;
volatile bool _received;

// SN initial numbers
_z_zint_t _sn_res;
_z_zint_t _sn_tx_reliable;
_z_zint_t _sn_tx_best_effort;
volatile _z_zint_t _lease;
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffer
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
_z_transport_common_t _common;
// Known valid peers
_z_transport_peer_entry_list_t *_peers;

// T message send function
_zp_f_send_tmsg _send_f;

#if Z_FEATURE_MULTI_THREAD == 1
_z_task_t *_read_task;
_z_task_t *_lease_task;
volatile bool _read_task_running;
volatile bool _lease_task_running;
#endif // Z_FEATURE_MULTI_THREAD == 1

volatile bool _transmitted;
_z_mutex_t _mutex_peer; // Peer list mutex
#endif
} _z_transport_multicast_t;

typedef struct {
@@ -214,4 +169,26 @@ bool _z_transport_start_batching(_z_transport_t *zt);
void _z_transport_stop_batching(_z_transport_t *zt);
#endif

#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */
#if Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_transport_tx_mutex_lock(_z_transport_common_t *ztc, bool block) {
if (block) {
_z_mutex_lock(&ztc->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztc->_mutex_tx);
}
}
static inline void _z_transport_tx_mutex_unlock(_z_transport_common_t *ztc) { _z_mutex_unlock(&ztc->_mutex_tx); }
static inline void _z_transport_rx_mutex_lock(_z_transport_common_t *ztc) { _z_mutex_lock(&ztc->_mutex_rx); }
static inline void _z_transport_rx_mutex_unlock(_z_transport_common_t *ztc) { _z_mutex_unlock(&ztc->_mutex_rx); }
#else
static inline z_result_t _z_transport_tx_mutex_lock(_z_transport_common_t *ztc, bool block) {
_ZP_UNUSED(ztc);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_transport_tx_mutex_unlock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); }
static inline void _z_transport_rx_mutex_lock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); }
static inline void _z_transport_rx_mutex_unlock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); }
#endif // Z_FEATURE_MULTI_THREAD == 1
#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */
25 changes: 1 addition & 24 deletions include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
@@ -27,27 +27,4 @@ z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bo
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);

#if Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) {
if (block) {
_z_mutex_lock(&ztu->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztu->_mutex_tx);
}
}
static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_tx); }
static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _z_mutex_lock(&ztu->_mutex_rx); }
static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_rx); }

#else
static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) {
_ZP_UNUSED(ztu);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1
#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */
#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */
26 changes: 0 additions & 26 deletions include/zenoh-pico/transport/unicast/tx.h

This file was deleted.

Loading

0 comments on commit 7388049

Please sign in to comment.