Skip to content

Commit

Permalink
Add matching subscribers state cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 14, 2025
1 parent a5da5f8 commit 713f6fe
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 12 deletions.
11 changes: 9 additions & 2 deletions include/zenoh-pico/session/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ typedef struct {
_z_matching_listener_ctx_t *ctx;
} _z_matching_listener_state_t;

_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy,
_z_noop_move)
_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback);
void _z_matching_listener_ctx_clear(_z_matching_listener_ctx_t *ctx);

_z_matching_listener_state_t *_z_matching_listener_state_new(uint32_t interest_id, _z_zint_t entity_id,
_z_matching_listener_ctx_t *ctx);
void _z_matching_listener_state_clear(_z_matching_listener_state_t *state);

_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_matching_listener_state_clear,
_z_noop_copy, _z_noop_move)
_Z_INT_MAP_DEFINE(_z_matching_listener, _z_matching_listener_state_t)
#endif

Expand Down
16 changes: 6 additions & 10 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/session/matching.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/result.h"

#if Z_FEATURE_MATCHING == 1
Expand Down Expand Up @@ -55,28 +54,25 @@ _z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _
_z_closure_matching_status_t callback) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t));

_z_matching_listener_t ret = _z_matching_listener_null();

_z_matching_listener_ctx_t *ctx = _z_matching_listener_ctx_new(callback);
if (ctx == NULL) {
return ret;
}
ctx->decl_id = 0;
ctx->callback = callback;

ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(zn), _z_keyexpr_alias_from_user_defined(*key, true),
_z_matching_listener_callback, flags, (void *)ctx);
if (ret._interest_id == 0) {
z_free(ctx);
_z_matching_listener_ctx_clear(ctx);
return ret;
}

ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
ret._zn = _z_session_rc_clone_as_weak(zn);

_z_matching_listener_state_t state;
state.entity_id = entity_id;
state.interest_id = ret._interest_id;
_z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id, &state);
_z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id,
_z_matching_listener_state_new(ret._interest_id, entity_id, ctx));

return ret;
}
Expand Down
48 changes: 48 additions & 0 deletions src/session/matching.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include "zenoh-pico/session/matching.h"

_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback) {
_z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t));

ctx->decl_id = 0;
ctx->callback = callback;

return ctx;
}

void _z_matching_listener_ctx_clear(_z_matching_listener_ctx_t *ctx) {
if (ctx->callback.drop != NULL) {
ctx->callback.drop(ctx->callback.context);
}
}

_z_matching_listener_state_t *_z_matching_listener_state_new(uint32_t interest_id, _z_zint_t entity_id,
_z_matching_listener_ctx_t *ctx) {
_z_matching_listener_state_t *state = z_malloc(sizeof(_z_matching_listener_state_t));

state->interest_id = interest_id;
state->entity_id = entity_id;
state->ctx = ctx;

return state;
}

void _z_matching_listener_state_clear(_z_matching_listener_state_t *state) {
if (state->ctx != NULL) {
_z_matching_listener_ctx_clear(state->ctx);
z_free(state->ctx);
}
}

0 comments on commit 713f6fe

Please sign in to comment.