diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index ed25adc8d..69dc67d32 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -94,6 +94,7 @@ int main(int argc, char **argv) { } z_undeclare_subscriber(z_move(sub)); + z_drop(z_move(channel)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index aae9356ea..595352daa 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -15,111 +15,74 @@ #define INCLUDE_ZENOH_PICO_API_HANDLERS_H #include -#include #include "zenoh-pico/api/macros.h" -#include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" #include "zenoh-pico/collections/element.h" -#include "zenoh-pico/collections/fifo.h" -#include "zenoh-pico/collections/ring.h" -#include "zenoh-pico/net/memory.h" -#include "zenoh-pico/system/platform.h" - -// -- Ring -typedef struct { - _z_ring_t _ring; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_t _mutex; -#endif -} _z_channel_ring_t; // TODO(sashacmc): rename to sync_ring? - -_z_channel_ring_t *_z_channel_ring(size_t capacity); -void _z_channel_ring_push(const void *src, void *context, z_element_free_f element_free); -int8_t _z_channel_ring_pull(void *dst, void *context, z_element_copy_f element_copy); -void _z_channel_ring_clear(_z_channel_ring_t *ring, z_element_free_f free_f); - -// -- Fifo -typedef struct { - _z_fifo_t _fifo; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_t _mutex; - zp_condvar_t _cv_not_full; - zp_condvar_t _cv_not_empty; -#endif -} _z_channel_fifo_t; - -_z_channel_fifo_t *_z_channel_fifo(size_t capacity); -void _z_channel_fifo_push(const void *src, void *context, z_element_free_f element_free); -int8_t _z_channel_fifo_pull(void *dst, void *context, z_element_copy_f element_copy); -void _z_channel_fifo_clear(_z_channel_fifo_t *fifo, z_element_free_f free_f); +#include "zenoh-pico/collections/fifo_mt.h" +#include "zenoh-pico/collections/ring_mt.h" +#include "zenoh-pico/utils/logging.h" // -- Samples handler -static inline size_t _z_owned_sample_size(z_owned_sample_t *s) { return sizeof(*s); } - -static inline void _z_owned_sample_copy(z_owned_sample_t *dst, const z_owned_sample_t *src) { - memcpy(dst, src, sizeof(z_owned_sample_t)); - // TODO(sashacmc): is it ok? - // Why not malloc + - // _z_sample_copy(dst->_value, src->_value); -} - -static inline z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { - z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); - if (dst && src) { - dst->_value = (_z_sample_t *)zp_malloc(sizeof(_z_sample_t)); - _z_sample_copy(dst->_value, src); - } - return dst; -} +void _z_owned_sample_copy(z_owned_sample_t *dst, const z_owned_sample_t *src); +z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); // -- Channel -#define _Z_CHANNEL_DEFINE(name, storage_type, send_closure_name, recv_closure_name, send_type, recv_type, \ - channel_push_f, channel_pull_f, storage_init_f, storage_free_f, elem_copy_f, elem_convert_f, \ - elem_free_f) \ - typedef struct { \ - z_owned_##send_closure_name##_t send; \ - z_owned_##recv_closure_name##_t recv; \ - storage_type *storage; \ - } z_owned_##name##_t; \ - static inline void z_##name##_elem_free(void **elem) { \ - elem_free_f((recv_type *)*elem); \ - *elem = NULL; \ - } \ - static inline void z_##name##_elem_copy(void *dst, const void *src) { \ - elem_copy_f((recv_type *)dst, (const recv_type *)src); \ - } \ - static inline void z_##name##_push(const send_type *elem, void *context) { \ - void *internal_elem = elem_convert_f(elem); \ - if (internal_elem == NULL) { \ - return; \ - } \ - channel_push_f(internal_elem, context, z_##name##_elem_free); \ - } \ - static inline void z_##name##_pull(recv_type *elem, void *context) { \ - channel_pull_f(elem, context, z_##name##_elem_copy); \ - } \ - static inline z_owned_##name##_t z_##name(size_t capacity) { \ - z_owned_##name##_t channel; \ - channel.storage = storage_init_f(capacity); \ - channel.send = z_##send_closure_name(z_##name##_push, NULL, channel.storage); \ - channel.recv = z_##recv_closure_name(z_##name##_pull, NULL, channel.storage); \ - return channel; \ - } \ - static inline void z_##name##_drop(z_owned_##name##_t *channel) { \ - storage_free_f(channel->storage, z_##name##_elem_free); \ - z_##send_closure_name##_drop(&channel->send); \ - z_##recv_closure_name##_drop(&channel->recv); \ +#define _Z_CHANNEL_DEFINE(name, collection_type, send_closure_name, recv_closure_name, send_type, recv_type, \ + channel_push_f, channel_pull_f, collection_new_f, collection_free_f, elem_copy_f, \ + elem_convert_f, elem_free_f) \ + typedef struct { \ + z_owned_##send_closure_name##_t send; \ + z_owned_##recv_closure_name##_t recv; \ + collection_type *collection; \ + } z_owned_##name##_t; \ + \ + static inline void _z_##name##_elem_free(void **elem) { \ + elem_free_f((recv_type *)*elem); \ + *elem = NULL; \ + } \ + static inline void _z_##name##_elem_copy(void *dst, const void *src) { \ + elem_copy_f((recv_type *)dst, (const recv_type *)src); \ + } \ + static inline void _z_##name##_push(const send_type *elem, void *context) { \ + void *internal_elem = elem_convert_f(elem); \ + if (internal_elem == NULL) { \ + return; \ + } \ + int8_t res = channel_push_f(internal_elem, context, _z_##name##_elem_free); \ + if (res) { \ + _Z_ERROR("%s failed: %i", #channel_push_f, res); \ + } \ + } \ + static inline void _z_##name##_pull(recv_type *elem, void *context) { \ + int8_t res = channel_pull_f(elem, context, _z_##name##_elem_copy); \ + if (res) { \ + _Z_ERROR("%s failed: %i", #channel_pull_f, res); \ + } \ + } \ + \ + static inline z_owned_##name##_t z_##name(size_t capacity) { \ + z_owned_##name##_t channel; \ + channel.collection = collection_new_f(capacity); \ + channel.send = z_##send_closure_name(_z_##name##_push, NULL, channel.collection); \ + channel.recv = z_##recv_closure_name(_z_##name##_pull, NULL, channel.collection); \ + return channel; \ + } \ + static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \ + static inline void z_##name##_drop(z_owned_##name##_t *channel) { \ + collection_free_f(channel->collection, _z_##name##_elem_free); \ + z_##send_closure_name##_drop(&channel->send); \ + z_##recv_closure_name##_drop(&channel->recv); \ } // z_owned_sample_ring_channel_t -_Z_CHANNEL_DEFINE(sample_ring_channel, _z_channel_ring_t, closure_sample, closure_owned_sample, z_sample_t, - z_owned_sample_t, _z_channel_ring_push, _z_channel_ring_pull, _z_channel_ring, _z_channel_ring_clear, - _z_owned_sample_copy, _z_sample_to_owned_ptr, z_sample_drop) +_Z_CHANNEL_DEFINE(sample_ring_channel, _z_ring_mt_t, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, + _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt, _z_ring_mt_free, _z_owned_sample_copy, + _z_sample_to_owned_ptr, z_sample_drop) // z_owned_sample_fifo_channel_t -_Z_CHANNEL_DEFINE(sample_fifo_channel, _z_channel_fifo_t, closure_sample, closure_owned_sample, z_sample_t, - z_owned_sample_t, _z_channel_fifo_push, _z_channel_fifo_pull, _z_channel_fifo, _z_channel_fifo_clear, - _z_owned_sample_copy, _z_sample_to_owned_ptr, z_sample_drop) +_Z_CHANNEL_DEFINE(sample_fifo_channel, _z_fifo_mt_t, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, + _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt, _z_fifo_mt_free, _z_owned_sample_copy, + _z_sample_to_owned_ptr, z_sample_drop) #endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index 339fb9570..e64f1f84a 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -144,7 +144,9 @@ z_owned_closure_reply_t : z_closure_reply_move, \ z_owned_closure_hello_t : z_closure_hello_move, \ z_owned_closure_zid_t : z_closure_zid_move, \ - z_owned_sample_t : z_sample_move \ + z_owned_sample_t : z_sample_move, \ + z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \ + z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move \ )(&x) /** diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index c84895efc..022e1c729 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -563,7 +563,28 @@ z_keyexpr_t z_query_keyexpr(const z_query_t *query); * Returns a new sample closure. */ z_owned_closure_sample_t z_closure_sample(_z_data_handler_t call, _z_dropper_handler_t drop, void *context); -// TODO(sashacmc): comment, correct place + +/** + * Return a new sample closure. + * It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said + * instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code, + * consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but + * will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val`` + * is valid. + * + * To check if ``val`` is still valid, you may use ``z_closure_owned_sample_check(&val)`` or ``z_check(val)`` if your + * compiler supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise. + * + * Parameters: + * call: the typical callback function. ``context`` will be passed as its last argument. + * drop: allows the callback's state to be freed. ``context`` will be passed as its last argument. + * context: a pointer to an arbitrary state. + * + * Returns: + * Returns a new sample closure. + */ z_owned_closure_owned_sample_t z_closure_owned_sample(_z_owned_sample_handler_t call, _z_dropper_handler_t drop, void *context); diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 95b2b4a4f..4a39ccaec 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -469,7 +469,17 @@ typedef struct { void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_sample_t *sample); -// TODO(sashacmc): comment, correct place +/** + * Represents the owned sample closure. + * + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Members: + * _z_owned_sample_handler_t call: `void *call(const struct z_owned_sample_t*, const void *context)` is the callback + * function. + * _z_dropper_handler_t drop: `void *drop(void*)` allows the callback's state to be freed. void *context: a + * pointer to an arbitrary state. + */ typedef struct { void *context; _z_owned_sample_handler_t call; diff --git a/include/zenoh-pico/collections/fifo_mt.h b/include/zenoh-pico/collections/fifo_mt.h new file mode 100644 index 000000000..b754b22a6 --- /dev/null +++ b/include/zenoh-pico/collections/fifo_mt.h @@ -0,0 +1,43 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_FIFO_MT_H +#define ZENOH_PICO_COLLECTIONS_FIFO_MT_H + +#include + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/fifo.h" +#include "zenoh-pico/system/platform.h" + +/*-------- Fifo Buffer Multithreaded --------*/ +typedef struct { + _z_fifo_t _fifo; +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_t _mutex; + zp_condvar_t _cv_not_full; + zp_condvar_t _cv_not_empty; +#endif +} _z_fifo_mt_t; + +int8_t _z_fifo_mti_init(size_t capacity); +_z_fifo_mt_t *_z_fifo_mt(size_t capacity); + +void _z_fifo_mt_clear(_z_fifo_mt_t *fifo, z_element_free_f free_f); +void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f); + +int8_t _z_fifo_mt_push(const void *src, void *context, z_element_free_f element_free); + +int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_copy_f element_copy); + +#endif // ZENOH_PICO_COLLECTIONS_FIFO_MT_H diff --git a/include/zenoh-pico/collections/ring_mt.h b/include/zenoh-pico/collections/ring_mt.h new file mode 100644 index 000000000..930046747 --- /dev/null +++ b/include/zenoh-pico/collections/ring_mt.h @@ -0,0 +1,41 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_RING_MT_H +#define ZENOH_PICO_COLLECTIONS_RING_MT_H + +#include + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/fifo.h" +#include "zenoh-pico/system/platform.h" + +/*-------- Ring Buffer Multithreaded --------*/ +typedef struct { + _z_ring_t _ring; +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_t _mutex; +#endif +} _z_ring_mt_t; + +int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity); +_z_ring_mt_t *_z_ring_mt(size_t capacity); + +void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f); +void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f); + +int8_t _z_ring_mt_push(const void *src, void *context, z_element_free_f element_free); + +int8_t _z_ring_mt_pull(void *dst, void *context, z_element_copy_f element_copy); + +#endif // ZENOH_PICO_COLLECTIONS_RING_MT_H diff --git a/src/api/handlers.c b/src/api/handlers.c index 281873255..f82d0f94f 100644 --- a/src/api/handlers.c +++ b/src/api/handlers.c @@ -1,5 +1,5 @@ // -// Copyright (c) 2022 ZettaScale Technology +// Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -13,170 +13,22 @@ // #include "zenoh-pico/api/handlers.h" - -#include "zenoh-pico/api/macros.h" #include "zenoh-pico/net/memory.h" -#include "zenoh-pico/protocol/core.h" #include "zenoh-pico/system/platform.h" -// -- Ring -void _z_channel_ring_push(const void *elem, void *context, z_element_free_f element_free) { - if (elem == NULL || context == NULL) { - return; - } - - _z_channel_ring_t *r = (_z_channel_ring_t *)context; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&r->_mutex); -#endif - - _z_ring_push_force_drop(&r->_ring, (void *)elem, element_free); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&r->_mutex); -#endif -} - -int8_t _z_channel_ring_pull(void *dst, void *context, z_element_copy_f element_copy) { - int8_t ret = _Z_RES_OK; - - _z_channel_ring_t *r = (_z_channel_ring_t *)context; - -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&r->_mutex); -#endif - - void *src = _z_ring_pull(&r->_ring); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&r->_mutex); -#endif - - if (src == NULL) { - dst = NULL; - } else { - element_copy(dst, src); - } - return ret; -} - -_z_channel_ring_t *_z_channel_ring(size_t capacity) { - _z_channel_ring_t *ring = (_z_channel_ring_t *)zp_malloc(sizeof(_z_channel_ring_t)); - if (ring == NULL) { - return NULL; - } - - int8_t res = _z_ring_init(&ring->_ring, capacity); - if (res != _Z_RES_OK) { - zp_free(ring); - return NULL; - } - -#if Z_FEATURE_MULTI_THREAD == 1 - res = zp_mutex_init(&ring->_mutex); - if (res != _Z_RES_OK) { - // TODO(sashacmc): add logging - zp_free(ring); - return NULL; - } -#endif - - return ring; +// -- Sample +void _z_owned_sample_copy(z_owned_sample_t *dst, const z_owned_sample_t *src) { + memcpy(dst, src, sizeof(z_owned_sample_t)); + // TODO(sashacmc): is it ok? + // Why not malloc + + // _z_sample_copy(dst->_value, src->_value); } -void _z_channel_ring_clear(_z_channel_ring_t *ring, z_element_free_f free_f) { -#if Z_FEATURE_MULTI_THREAD == 1 - int8_t res = zp_mutex_free(&ring->_mutex); - if (res != _Z_RES_OK) { - // TODO(sashacmc): add logging - return; +z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { + z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); + if (dst && src) { + dst->_value = (_z_sample_t *)zp_malloc(sizeof(_z_sample_t)); + _z_sample_copy(dst->_value, src); } -#endif - - _z_ring_clear(&ring->_ring, free_f); - zp_free(ring); -} - -// -- Fifo -void _z_channel_fifo_push(const void *elem, void *context, z_element_free_f element_free) { - if (elem == NULL || context == NULL) { - return; - } - - _z_channel_fifo_t *f = (_z_channel_fifo_t *)context; - -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&f->_mutex); - while (elem != NULL) { - elem = _z_fifo_push(&f->_fifo, (void *)elem); - if (elem != NULL) { - zp_condvar_wait(&f->_cv_not_full, &f->_mutex); - } else { - zp_condvar_signal(&f->_cv_not_empty); - } - } - zp_mutex_unlock(&f->_mutex); -#else // Z_FEATURE_MULTI_THREAD == 1 - _z_fifo_push_drop(&f->_fifo, elem, element_free); -#endif // Z_FEATURE_MULTI_THREAD == 1 -} - -int8_t _z_channel_fifo_pull(void *dst, void *context, z_element_copy_f element_copy) { - int8_t ret = _Z_RES_OK; - - _z_channel_fifo_t *f = (_z_channel_fifo_t *)context; - -#if Z_FEATURE_MULTI_THREAD == 1 - void *src = NULL; - zp_mutex_lock(&f->_mutex); - while (src == NULL) { - src = _z_fifo_pull(&f->_fifo); - if (src == NULL) { - zp_condvar_wait(&f->_cv_not_empty, &f->_mutex); - } else { - zp_condvar_signal(&f->_cv_not_full); - } - } - zp_mutex_unlock(&f->_mutex); - element_copy(dst, src); -#else // Z_FEATURE_MULTI_THREAD == 1 - void *src = _z_fifo_pull(&f->_fifo); - if (src != NULL) { - element_copy(dst, src); - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - - return ret; -} - -_z_channel_fifo_t *_z_channel_fifo(size_t capacity) { - _z_channel_fifo_t *fifo = (_z_channel_fifo_t *)zp_malloc(sizeof(_z_channel_fifo_t)); - if (fifo == NULL) { - return NULL; - } - - int8_t res = _z_fifo_init(&fifo->_fifo, capacity); - if (res != _Z_RES_OK) { - zp_free(fifo); - return NULL; - } - -#if Z_FEATURE_MULTI_THREAD == 1 - // TODO(sashacmc): result error check - res = zp_mutex_init(&fifo->_mutex); - res = zp_condvar_init(&fifo->_cv_not_full); - res = zp_condvar_init(&fifo->_cv_not_empty); -#endif - - return fifo; -} - -void _z_channel_fifo_clear(_z_channel_fifo_t *fifo, z_element_free_f free_f) { -#if Z_FEATURE_MULTI_THREAD == 1 - // TODO(sashacmc): result error check - int8_t res = zp_mutex_free(&fifo->_mutex); - res = zp_condvar_free(&fifo->_cv_not_full); - res = zp_condvar_free(&fifo->_cv_not_empty); -#endif - - _z_fifo_clear(&fifo->_fifo, free_f); - zp_free(fifo); + return dst; } diff --git a/src/collections/fifo_mt.c b/src/collections/fifo_mt.c new file mode 100644 index 000000000..31e1d448d --- /dev/null +++ b/src/collections/fifo_mt.c @@ -0,0 +1,147 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/collections/fifo_mt.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/utils/logging.h" + +/*-------- Fifo Buffer Multithreaded --------*/ +int8_t _z_fifo_mt_init(_z_fifo_mt_t *fifo, size_t capacity) { + int8_t res = _z_fifo_init(&fifo->_fifo, capacity); + if (res) { + return res; + } + +#if Z_FEATURE_MULTI_THREAD == 1 + res = zp_mutex_init(&fifo->_mutex); + if (res) { + return res; + } + res = zp_condvar_init(&fifo->_cv_not_full); + if (res) { + return res; + } + res = zp_condvar_init(&fifo->_cv_not_empty); + if (res) { + return res; + } +#endif + + return _Z_RES_OK; +} + +_z_fifo_mt_t *_z_fifo_mt(size_t capacity) { + _z_fifo_mt_t *fifo = (_z_fifo_mt_t *)zp_malloc(sizeof(_z_fifo_mt_t)); + if (fifo == NULL) { + _Z_ERROR("zp_malloc failed"); + return NULL; + } + + int8_t res = _z_fifo_mt_init(fifo, capacity); + if (res) { + _Z_ERROR("_z_fifo_mt_init failed: %i", res); + zp_free(fifo); + return NULL; + } + + return fifo; +} + +void _z_fifo_mt_clear(_z_fifo_mt_t *fifo, z_element_free_f free_f) { +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_free(&fifo->_mutex); + zp_condvar_free(&fifo->_cv_not_full); + zp_condvar_free(&fifo->_cv_not_empty); +#endif + + _z_fifo_clear(&fifo->_fifo, free_f); +} + +void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f) { + _z_fifo_mt_clear(fifo, free_f); + zp_free(fifo); +} + +int8_t _z_fifo_mt_push(const void *elem, void *context, z_element_free_f element_free) { + if (elem == NULL || context == NULL) { + return _Z_ERR_GENERIC; + } + + _z_fifo_mt_t *f = (_z_fifo_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + int res = zp_mutex_lock(&f->_mutex); + if (res) { + return res; + } + while (elem != NULL) { + elem = _z_fifo_push(&f->_fifo, (void *)elem); + if (elem != NULL) { + res = zp_condvar_wait(&f->_cv_not_full, &f->_mutex); + if (res) { + return res; + } + } else { + res = zp_condvar_signal(&f->_cv_not_empty); + if (res) { + return res; + } + } + } + res = zp_mutex_unlock(&f->_mutex); + if (res) { + return res; + } +#else // Z_FEATURE_MULTI_THREAD == 1 + _z_fifo_push_drop(&f->_fifo, elem, element_free); +#endif // Z_FEATURE_MULTI_THREAD == 1 +} + +int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_copy_f element_copy) { + _z_fifo_mt_t *f = (_z_fifo_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + void *src = NULL; + int res = zp_mutex_lock(&f->_mutex); + if (res) { + return res; + } + while (src == NULL) { + src = _z_fifo_pull(&f->_fifo); + if (src == NULL) { + res = zp_condvar_wait(&f->_cv_not_empty, &f->_mutex); + if (res) { + return res; + } + } else { + res = zp_condvar_signal(&f->_cv_not_full); + if (res) { + return res; + } + } + } + res = zp_mutex_unlock(&f->_mutex); + if (res) { + return res; + } + element_copy(dst, src); +#else // Z_FEATURE_MULTI_THREAD == 1 + void *src = _z_fifo_pull(&f->_fifo); + if (src != NULL) { + element_copy(dst, src); + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return _Z_RES_OK; +} diff --git a/src/collections/ring_mt.c b/src/collections/ring_mt.c new file mode 100644 index 000000000..d75da21f1 --- /dev/null +++ b/src/collections/ring_mt.c @@ -0,0 +1,114 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/collections/ring_mt.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/utils/logging.h" + +/*-------- Ring Buffer Multithreaded --------*/ +int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity) { + int8_t res = _z_ring_init(&ring->_ring, capacity); + if (res) { + return res; + } + +#if Z_FEATURE_MULTI_THREAD == 1 + res = zp_mutex_init(&ring->_mutex); + if (res) { + return res; + } +#endif + return _Z_RES_OK; +} + +_z_ring_mt_t *_z_ring_mt(size_t capacity) { + _z_ring_mt_t *ring = (_z_ring_mt_t *)zp_malloc(sizeof(_z_ring_mt_t)); + if (ring == NULL) { + _Z_ERROR("zp_malloc failed"); + return NULL; + } + + int8_t res = _z_ring_mt_init(ring, capacity); + if (res) { + _Z_ERROR("_z_ring_mt_init failed: %i", res); + return NULL; + } + + return ring; +} + +void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f) { +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_free(&ring->_mutex); +#endif + + _z_ring_clear(&ring->_ring, free_f); +} + +void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f) { + _z_ring_mt_clear(ring, free_f); + + zp_free(ring); +} + +int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element_free) { + if (elem == NULL || context == NULL) { + return _Z_ERR_GENERIC; + } + + _z_ring_mt_t *r = (_z_ring_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + int8_t res = zp_mutex_lock(&r->_mutex); + if (res) { + return res; + } +#endif + + _z_ring_push_force_drop(&r->_ring, (void *)elem, element_free); + +#if Z_FEATURE_MULTI_THREAD == 1 + res = zp_mutex_unlock(&r->_mutex); + if (res) { + return res; + } +#endif +} + +int8_t _z_ring_mt_pull(void *dst, void *context, z_element_copy_f element_copy) { + _z_ring_mt_t *r = (_z_ring_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + int res = zp_mutex_lock(&r->_mutex); + if (res) { + return res; + } +#endif + + void *src = _z_ring_pull(&r->_ring); + +#if Z_FEATURE_MULTI_THREAD == 1 + res = zp_mutex_unlock(&r->_mutex); + if (res) { + return res; + } +#endif + + if (src == NULL) { + dst = NULL; + } else { + element_copy(dst, src); + } + return _Z_RES_OK; +}