Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching v2 #765

Merged
merged 9 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs
/**
* Activate the batching mechanism.
* Any message that would have been sent on the network by a subsequent api call (e.g z_put, z_get)
* will be instead stored until the batch is flushed with :c:func:`zp_batch_flush`.
* will be instead stored until the batch is full or batching is stopped with :c:func:`zp_batch_stop`.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages.
Expand All @@ -2071,15 +2071,15 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs
z_result_t zp_batch_start(const z_loaned_session_t *zs);

/**
* Deactivate the batching mechanism and flush the messages that were stored.
* Deactivate the batching mechanism and flush the remaining messages.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will stop batching messages.
*
* Return:
* ``0`` if batching stopped and batch successfully sent, ``negative value`` otherwise.
*/
z_result_t zp_batch_flush(const z_loaned_session_t *zs);
z_result_t zp_batch_stop(const z_loaned_session_t *zs);
#endif

/************* Multi Thread Tasks helpers **************/
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ typedef size_t _z_zint_t;
typedef struct {
uint8_t id[16];
} _z_id_t;
extern const _z_id_t empty_id;
uint8_t _z_id_len(_z_id_t id);
bool _z_id_check(_z_id_t id);
_z_id_t _z_id_empty(void);
static inline bool _z_id_check(_z_id_t id) { return memcmp(&id, &empty_id, sizeof(id)) != 0; }
static inline _z_id_t _z_id_empty(void) { return (_z_id_t){0}; }

/**
* A zenoh timestamp.
Expand Down
3 changes: 0 additions & 3 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ void _z_session_clear(_z_session_t *zn);
z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);

z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
z_result_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

#if Z_FEATURE_MULTI_THREAD == 1
static inline void _z_session_mutex_lock(_z_session_t *zn) { (void)_z_mutex_lock(&zn->_mutex_inner); }
Expand Down
4 changes: 4 additions & 0 deletions include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);

/*------------------ Transmission and Reception helpers ------------------*/
z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg);
z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg);
z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg);
z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_congestion_control_t cong_ctrl);

#endif /* ZENOH_PICO_TRANSPORT_TX_H */
21 changes: 0 additions & 21 deletions include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,10 @@ z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t r
void _z_multicast_transport_clear(_z_transport_t *zt);

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) {
if (block) {
_z_mutex_lock(&ztm->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztm->_mutex_tx);
}
}
static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); }
static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_rx); }
static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_rx); }
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_peer); }

#else
static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) {
_ZP_UNUSED(ztm);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }

#endif // (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
#endif /* ZENOH_PICO_MULTICAST_TRANSPORT_H */
26 changes: 0 additions & 26 deletions include/zenoh-pico/transport/multicast/tx.h

This file was deleted.

2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/raweth/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
z_result_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg);
z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg);
z_result_t _z_raweth_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg);

#endif /* ZENOH_PICO_RAWETH_TX_H */
141 changes: 59 additions & 82 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,23 @@ enum _z_batching_state_e {
};

typedef struct {
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif

_z_id_t _remote_zid;
_z_slice_t _remote_addr;
_z_conduit_sn_list_t _sn_rx_sns;

// SN numbers
_z_zint_t _sn_res;
volatile _z_zint_t _lease;
volatile _z_zint_t _next_lease;

uint16_t _peer_id;
volatile bool _received;

#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif
} _z_transport_peer_entry_t;

size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src);
Expand All @@ -71,107 +69,64 @@ _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport
// Forward type declaration to avoid cyclical include
typedef struct _z_session_rc_t _z_session_rc_ref_t;

// Forward declaration to be used in _zp_f_send_tmsg*
typedef struct _z_transport_multicast_t _z_transport_multicast_t;
// Send function prototype
typedef z_result_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_transport_message_t *t_msg);

typedef struct {
// Session associated to the transport
_z_session_rc_ref_t *_session;

#if Z_FEATURE_MULTI_THREAD == 1
// TX and RX mutexes
_z_mutex_t _mutex_rx;
_z_mutex_t _mutex_tx;
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_link_t _link;

#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffer
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif

// Regular Buffers
// TX and RX buffers
_z_wbuf_t _wbuf;
_z_zbuf_t _zbuf;

_z_id_t _remote_zid;

// SN numbers
_z_zint_t _sn_res;
_z_zint_t _sn_tx_reliable;
_z_zint_t _sn_tx_best_effort;
_z_zint_t _sn_rx_reliable;
_z_zint_t _sn_rx_best_effort;
volatile _z_zint_t _lease;

// Transport batching
#if Z_FEATURE_BATCHING == 1
uint8_t _batch_state;
_z_network_message_vec_t _batch;
#endif

#if Z_FEATURE_MULTI_THREAD == 1
_z_task_t *_read_task;
_z_task_t *_lease_task;
volatile bool _read_task_running;
volatile bool _lease_task_running;
#endif // Z_FEATURE_MULTI_THREAD == 1

volatile bool _received;
volatile bool _transmitted;
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
// Session associated to the transport
_z_session_rc_ref_t *_session;

#if Z_FEATURE_MULTI_THREAD == 1
// TX and RX mutexes
_z_mutex_t _mutex_rx;
_z_mutex_t _mutex_tx;

// Peer list mutex
_z_mutex_t _mutex_peer;
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_task_t *_read_task;
_z_task_t *_lease_task;
volatile bool _read_task_running;
volatile bool _lease_task_running;
#endif
// Transport batching
#if Z_FEATURE_BATCHING == 1
uint8_t _batch_state;
_z_network_message_vec_t _batch;
size_t _batch_count;
#endif
} _z_transport_common_t;

_z_link_t _link;
// Send function prototype
typedef z_result_t (*_zp_f_send_tmsg)(_z_transport_common_t *self, const _z_transport_message_t *t_msg);

// TX and RX buffers
_z_wbuf_t _wbuf;
_z_zbuf_t _zbuf;
typedef struct {
_z_transport_common_t _common;
_z_id_t _remote_zid;
_z_zint_t _sn_rx_reliable;
_z_zint_t _sn_rx_best_effort;
volatile bool _received;

// SN initial numbers
_z_zint_t _sn_res;
_z_zint_t _sn_tx_reliable;
_z_zint_t _sn_tx_best_effort;
volatile _z_zint_t _lease;
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffer
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
_z_transport_common_t _common;
// Known valid peers
_z_transport_peer_entry_list_t *_peers;

// T message send function
_zp_f_send_tmsg _send_f;

#if Z_FEATURE_MULTI_THREAD == 1
_z_task_t *_read_task;
_z_task_t *_lease_task;
volatile bool _read_task_running;
volatile bool _lease_task_running;
#endif // Z_FEATURE_MULTI_THREAD == 1

volatile bool _transmitted;
_z_mutex_t _mutex_peer; // Peer list mutex
#endif
} _z_transport_multicast_t;

typedef struct {
Expand Down Expand Up @@ -214,4 +169,26 @@ bool _z_transport_start_batching(_z_transport_t *zt);
void _z_transport_stop_batching(_z_transport_t *zt);
#endif

#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */
#if Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_transport_tx_mutex_lock(_z_transport_common_t *ztc, bool block) {
if (block) {
_z_mutex_lock(&ztc->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztc->_mutex_tx);
}
}
static inline void _z_transport_tx_mutex_unlock(_z_transport_common_t *ztc) { _z_mutex_unlock(&ztc->_mutex_tx); }
static inline void _z_transport_rx_mutex_lock(_z_transport_common_t *ztc) { _z_mutex_lock(&ztc->_mutex_rx); }
static inline void _z_transport_rx_mutex_unlock(_z_transport_common_t *ztc) { _z_mutex_unlock(&ztc->_mutex_rx); }
#else
static inline z_result_t _z_transport_tx_mutex_lock(_z_transport_common_t *ztc, bool block) {
_ZP_UNUSED(ztc);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_transport_tx_mutex_unlock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); }
static inline void _z_transport_rx_mutex_lock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); }
static inline void _z_transport_rx_mutex_unlock(_z_transport_common_t *ztc) { _ZP_UNUSED(ztc); }
#endif // Z_FEATURE_MULTI_THREAD == 1
#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */
25 changes: 1 addition & 24 deletions include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,4 @@ z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bo
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);

#if Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) {
if (block) {
_z_mutex_lock(&ztu->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztu->_mutex_tx);
}
}
static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_tx); }
static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _z_mutex_lock(&ztu->_mutex_rx); }
static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_rx); }

#else
static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) {
_ZP_UNUSED(ztu);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1
#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */
#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */
26 changes: 0 additions & 26 deletions include/zenoh-pico/transport/unicast/tx.h

This file was deleted.

Loading
Loading