From 713f6fead038998e896aad50a2624af57ff70709 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 14 Jan 2025 15:21:08 +0100 Subject: [PATCH] Add matching subscribers state cleanup --- include/zenoh-pico/session/matching.h | 11 ++++-- src/net/matching.c | 16 ++++----- src/session/matching.c | 48 +++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 12 deletions(-) create mode 100644 src/session/matching.c diff --git a/include/zenoh-pico/session/matching.h b/include/zenoh-pico/session/matching.h index 143fb13f1..6bbaab107 100644 --- a/include/zenoh-pico/session/matching.h +++ b/include/zenoh-pico/session/matching.h @@ -47,8 +47,15 @@ typedef struct { _z_matching_listener_ctx_t *ctx; } _z_matching_listener_state_t; -_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy, - _z_noop_move) +_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback); +void _z_matching_listener_ctx_clear(_z_matching_listener_ctx_t *ctx); + +_z_matching_listener_state_t *_z_matching_listener_state_new(uint32_t interest_id, _z_zint_t entity_id, + _z_matching_listener_ctx_t *ctx); +void _z_matching_listener_state_clear(_z_matching_listener_state_t *state); + +_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_matching_listener_state_clear, + _z_noop_copy, _z_noop_move) _Z_INT_MAP_DEFINE(_z_matching_listener, _z_matching_listener_state_t) #endif diff --git a/src/net/matching.c b/src/net/matching.c index c74811e97..d7cd27e25 100644 --- a/src/net/matching.c +++ b/src/net/matching.c @@ -22,7 +22,6 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/session/matching.h" #include "zenoh-pico/session/resource.h" -#include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" #if Z_FEATURE_MATCHING == 1 @@ -55,28 +54,25 @@ _z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _ _z_closure_matching_status_t callback) { uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; - _z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t)); - _z_matching_listener_t ret = _z_matching_listener_null(); + + _z_matching_listener_ctx_t *ctx = _z_matching_listener_ctx_new(callback); if (ctx == NULL) { return ret; } - ctx->decl_id = 0; - ctx->callback = callback; + ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(zn), _z_keyexpr_alias_from_user_defined(*key, true), _z_matching_listener_callback, flags, (void *)ctx); if (ret._interest_id == 0) { - z_free(ctx); + _z_matching_listener_ctx_clear(ctx); return ret; } ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); ret._zn = _z_session_rc_clone_as_weak(zn); - _z_matching_listener_state_t state; - state.entity_id = entity_id; - state.interest_id = ret._interest_id; - _z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id, &state); + _z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id, + _z_matching_listener_state_new(ret._interest_id, entity_id, ctx)); return ret; } diff --git a/src/session/matching.c b/src/session/matching.c new file mode 100644 index 000000000..793e2b3ba --- /dev/null +++ b/src/session/matching.c @@ -0,0 +1,48 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/session/matching.h" + +_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback) { + _z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t)); + + ctx->decl_id = 0; + ctx->callback = callback; + + return ctx; +} + +void _z_matching_listener_ctx_clear(_z_matching_listener_ctx_t *ctx) { + if (ctx->callback.drop != NULL) { + ctx->callback.drop(ctx->callback.context); + } +} + +_z_matching_listener_state_t *_z_matching_listener_state_new(uint32_t interest_id, _z_zint_t entity_id, + _z_matching_listener_ctx_t *ctx) { + _z_matching_listener_state_t *state = z_malloc(sizeof(_z_matching_listener_state_t)); + + state->interest_id = interest_id; + state->entity_id = entity_id; + state->ctx = ctx; + + return state; +} + +void _z_matching_listener_state_clear(_z_matching_listener_state_t *state) { + if (state->ctx != NULL) { + _z_matching_listener_ctx_clear(state->ctx); + z_free(state->ctx); + } +}