Skip to content

Commit

Permalink
Refactoring, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 4, 2024
1 parent 7472934 commit 686b9f9
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 257 deletions.
1 change: 1 addition & 0 deletions examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ int main(int argc, char **argv) {
}

z_undeclare_subscriber(z_move(sub));
z_drop(z_move(channel));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
Expand Down
149 changes: 56 additions & 93 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,111 +15,74 @@
#define INCLUDE_ZENOH_PICO_API_HANDLERS_H

#include <stdint.h>
#include <stdio.h>

#include "zenoh-pico/api/macros.h"
#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/fifo.h"
#include "zenoh-pico/collections/ring.h"
#include "zenoh-pico/net/memory.h"
#include "zenoh-pico/system/platform.h"

// -- Ring
typedef struct {
_z_ring_t _ring;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_t _mutex;
#endif
} _z_channel_ring_t; // TODO(sashacmc): rename to sync_ring?

_z_channel_ring_t *_z_channel_ring(size_t capacity);
void _z_channel_ring_push(const void *src, void *context, z_element_free_f element_free);
int8_t _z_channel_ring_pull(void *dst, void *context, z_element_copy_f element_copy);
void _z_channel_ring_clear(_z_channel_ring_t *ring, z_element_free_f free_f);

// -- Fifo
typedef struct {
_z_fifo_t _fifo;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_t _mutex;
zp_condvar_t _cv_not_full;
zp_condvar_t _cv_not_empty;
#endif
} _z_channel_fifo_t;

_z_channel_fifo_t *_z_channel_fifo(size_t capacity);
void _z_channel_fifo_push(const void *src, void *context, z_element_free_f element_free);
int8_t _z_channel_fifo_pull(void *dst, void *context, z_element_copy_f element_copy);
void _z_channel_fifo_clear(_z_channel_fifo_t *fifo, z_element_free_f free_f);
#include "zenoh-pico/collections/fifo_mt.h"
#include "zenoh-pico/collections/ring_mt.h"
#include "zenoh-pico/utils/logging.h"

// -- Samples handler
static inline size_t _z_owned_sample_size(z_owned_sample_t *s) { return sizeof(*s); }

static inline void _z_owned_sample_copy(z_owned_sample_t *dst, const z_owned_sample_t *src) {
memcpy(dst, src, sizeof(z_owned_sample_t));
// TODO(sashacmc): is it ok?
// Why not malloc +
// _z_sample_copy(dst->_value, src->_value);
}

static inline z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) {
z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t));
if (dst && src) {
dst->_value = (_z_sample_t *)zp_malloc(sizeof(_z_sample_t));
_z_sample_copy(dst->_value, src);
}
return dst;
}
void _z_owned_sample_copy(z_owned_sample_t *dst, const z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// -- Channel
#define _Z_CHANNEL_DEFINE(name, storage_type, send_closure_name, recv_closure_name, send_type, recv_type, \
channel_push_f, channel_pull_f, storage_init_f, storage_free_f, elem_copy_f, elem_convert_f, \
elem_free_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
storage_type *storage; \
} z_owned_##name##_t; \
static inline void z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
*elem = NULL; \
} \
static inline void z_##name##_elem_copy(void *dst, const void *src) { \
elem_copy_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void z_##name##_push(const send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
} \
channel_push_f(internal_elem, context, z_##name##_elem_free); \
} \
static inline void z_##name##_pull(recv_type *elem, void *context) { \
channel_pull_f(elem, context, z_##name##_elem_copy); \
} \
static inline z_owned_##name##_t z_##name(size_t capacity) { \
z_owned_##name##_t channel; \
channel.storage = storage_init_f(capacity); \
channel.send = z_##send_closure_name(z_##name##_push, NULL, channel.storage); \
channel.recv = z_##recv_closure_name(z_##name##_pull, NULL, channel.storage); \
return channel; \
} \
static inline void z_##name##_drop(z_owned_##name##_t *channel) { \
storage_free_f(channel->storage, z_##name##_elem_free); \
z_##send_closure_name##_drop(&channel->send); \
z_##recv_closure_name##_drop(&channel->recv); \
#define _Z_CHANNEL_DEFINE(name, collection_type, send_closure_name, recv_closure_name, send_type, recv_type, \
channel_push_f, channel_pull_f, collection_new_f, collection_free_f, elem_copy_f, \
elem_convert_f, elem_free_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
collection_type *collection; \
} z_owned_##name##_t; \
\
static inline void _z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
*elem = NULL; \
} \
static inline void _z_##name##_elem_copy(void *dst, const void *src) { \
elem_copy_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void _z_##name##_push(const send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
} \
int8_t res = channel_push_f(internal_elem, context, _z_##name##_elem_free); \
if (res) { \
_Z_ERROR("%s failed: %i", #channel_push_f, res); \
} \
} \
static inline void _z_##name##_pull(recv_type *elem, void *context) { \
int8_t res = channel_pull_f(elem, context, _z_##name##_elem_copy); \
if (res) { \
_Z_ERROR("%s failed: %i", #channel_pull_f, res); \
} \
} \
\
static inline z_owned_##name##_t z_##name(size_t capacity) { \
z_owned_##name##_t channel; \
channel.collection = collection_new_f(capacity); \
channel.send = z_##send_closure_name(_z_##name##_push, NULL, channel.collection); \
channel.recv = z_##recv_closure_name(_z_##name##_pull, NULL, channel.collection); \
return channel; \
} \
static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \
static inline void z_##name##_drop(z_owned_##name##_t *channel) { \
collection_free_f(channel->collection, _z_##name##_elem_free); \
z_##send_closure_name##_drop(&channel->send); \
z_##recv_closure_name##_drop(&channel->recv); \
}

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, _z_channel_ring_t, closure_sample, closure_owned_sample, z_sample_t,
z_owned_sample_t, _z_channel_ring_push, _z_channel_ring_pull, _z_channel_ring, _z_channel_ring_clear,
_z_owned_sample_copy, _z_sample_to_owned_ptr, z_sample_drop)
_Z_CHANNEL_DEFINE(sample_ring_channel, _z_ring_mt_t, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t,
_z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt, _z_ring_mt_free, _z_owned_sample_copy,
_z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
_Z_CHANNEL_DEFINE(sample_fifo_channel, _z_channel_fifo_t, closure_sample, closure_owned_sample, z_sample_t,
z_owned_sample_t, _z_channel_fifo_push, _z_channel_fifo_pull, _z_channel_fifo, _z_channel_fifo_clear,
_z_owned_sample_copy, _z_sample_to_owned_ptr, z_sample_drop)
_Z_CHANNEL_DEFINE(sample_fifo_channel, _z_fifo_mt_t, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t,
_z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt, _z_fifo_mt_free, _z_owned_sample_copy,
_z_sample_to_owned_ptr, z_sample_drop)

#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
4 changes: 3 additions & 1 deletion include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@
z_owned_closure_reply_t : z_closure_reply_move, \
z_owned_closure_hello_t : z_closure_hello_move, \
z_owned_closure_zid_t : z_closure_zid_move, \
z_owned_sample_t : z_sample_move \
z_owned_sample_t : z_sample_move, \
z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \
z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move \
)(&x)

/**
Expand Down
23 changes: 22 additions & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,28 @@ z_keyexpr_t z_query_keyexpr(const z_query_t *query);
* Returns a new sample closure.
*/
z_owned_closure_sample_t z_closure_sample(_z_data_handler_t call, _z_dropper_handler_t drop, void *context);
// TODO(sashacmc): comment, correct place

/**
* Return a new sample closure.
* It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks.
*
* Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said
* instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code,
* consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but
* will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val``
* is valid.
*
* To check if ``val`` is still valid, you may use ``z_closure_owned_sample_check(&val)`` or ``z_check(val)`` if your
* compiler supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise.
*
* Parameters:
* call: the typical callback function. ``context`` will be passed as its last argument.
* drop: allows the callback's state to be freed. ``context`` will be passed as its last argument.
* context: a pointer to an arbitrary state.
*
* Returns:
* Returns a new sample closure.
*/
z_owned_closure_owned_sample_t z_closure_owned_sample(_z_owned_sample_handler_t call, _z_dropper_handler_t drop,
void *context);

Expand Down
12 changes: 11 additions & 1 deletion include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,17 @@ typedef struct {

void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_sample_t *sample);

// TODO(sashacmc): comment, correct place
/**
* Represents the owned sample closure.
*
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks.
*
* Members:
* _z_owned_sample_handler_t call: `void *call(const struct z_owned_sample_t*, const void *context)` is the callback
* function.
* _z_dropper_handler_t drop: `void *drop(void*)` allows the callback's state to be freed. void *context: a
* pointer to an arbitrary state.
*/
typedef struct {
void *context;
_z_owned_sample_handler_t call;
Expand Down
43 changes: 43 additions & 0 deletions include/zenoh-pico/collections/fifo_mt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#ifndef ZENOH_PICO_COLLECTIONS_FIFO_MT_H
#define ZENOH_PICO_COLLECTIONS_FIFO_MT_H

#include <stdint.h>

#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/fifo.h"
#include "zenoh-pico/system/platform.h"

/*-------- Fifo Buffer Multithreaded --------*/
typedef struct {
_z_fifo_t _fifo;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_t _mutex;
zp_condvar_t _cv_not_full;
zp_condvar_t _cv_not_empty;
#endif
} _z_fifo_mt_t;

int8_t _z_fifo_mti_init(size_t capacity);
_z_fifo_mt_t *_z_fifo_mt(size_t capacity);

void _z_fifo_mt_clear(_z_fifo_mt_t *fifo, z_element_free_f free_f);
void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f);

int8_t _z_fifo_mt_push(const void *src, void *context, z_element_free_f element_free);

int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_copy_f element_copy);

#endif // ZENOH_PICO_COLLECTIONS_FIFO_MT_H
41 changes: 41 additions & 0 deletions include/zenoh-pico/collections/ring_mt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#ifndef ZENOH_PICO_COLLECTIONS_RING_MT_H
#define ZENOH_PICO_COLLECTIONS_RING_MT_H

#include <stdint.h>

#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/fifo.h"
#include "zenoh-pico/system/platform.h"

/*-------- Ring Buffer Multithreaded --------*/
typedef struct {
_z_ring_t _ring;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_t _mutex;
#endif
} _z_ring_mt_t;

int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity);
_z_ring_mt_t *_z_ring_mt(size_t capacity);

void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f);
void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f);

int8_t _z_ring_mt_push(const void *src, void *context, z_element_free_f element_free);

int8_t _z_ring_mt_pull(void *dst, void *context, z_element_copy_f element_copy);

#endif // ZENOH_PICO_COLLECTIONS_RING_MT_H
Loading

0 comments on commit 686b9f9

Please sign in to comment.